In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Dataframe_1").getOrCreate()

In [3]:
spark

In [4]:
employee = spark.read.csv("emp1.csv", header = True, inferSchema = True )

In [5]:
employee.show()

+------+-------+----+-------+----+---+
|emp_id|   name|dept|salary | age|sex|
+------+-------+----+-------+----+---+
|     1|  argha|sale|  10000|  25|  M|
|     2|  sohag|tech|  20000|  25|  M|
|     7|  soura|tech|  30000|  25|  M|
|     4| shivam|tech|  10000|  26|  F|
|     5|  lohit|  hr|  40000|  26|  F|
|     6|prakash|  hr|  50000|  26|  M|
|     8|sourabh|tech|  60000|null|  F|
|    10|  biswa|sale|  50000|null|  M|
|     7|   null|tech|  30000|  25|  M|
|     6|prakash|null|  50000|  26|  M|
|     4| shivam|tech|  10000|  26|  F|
+------+-------+----+-------+----+---+



In [6]:
employee.columns

['emp_id', 'name', 'dept', 'salary ', 'age', 'sex']

In [7]:
employee.describe().show()

+-------+-----------------+-------+----+------------------+------------------+----+
|summary|           emp_id|   name|dept|           salary |               age| sex|
+-------+-----------------+-------+----+------------------+------------------+----+
|  count|               11|     10|  10|                11|                 9|  11|
|   mean|5.454545454545454|   null|null|32727.272727272728|25.555555555555557|null|
| stddev|2.621588692515909|   null|null|18488.325554743507|0.5270462766947303|null|
|    min|                1|  argha|  hr|             10000|                25|   F|
|    max|               10|sourabh|tech|             60000|                26|   M|
+-------+-----------------+-------+----+------------------+------------------+----+



In [8]:
employee.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary : integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)



In [9]:
employee.show(3)

+------+-----+----+-------+---+---+
|emp_id| name|dept|salary |age|sex|
+------+-----+----+-------+---+---+
|     1|argha|sale|  10000| 25|  M|
|     2|sohag|tech|  20000| 25|  M|
|     7|soura|tech|  30000| 25|  M|
+------+-----+----+-------+---+---+
only showing top 3 rows



## Change Datatypes

In [10]:
from pyspark.sql.types import (StructField, IntegerType, 
                               StringType, StructType)


In [11]:
data_schema = [StructField("emp_id", IntegerType(), True),
               StructField("name", StringType(), True),
               StructField("dept", StringType(), True),
               StructField("salary ", IntegerType(), True),
               StructField("age", IntegerType(), True),
               StructField("sex", StringType(), True)]

In [12]:
final_struc = StructType(fields = data_schema  )

In [13]:
employee = spark.read.csv("emp1.csv", schema = final_struc , header = True  )

In [14]:
employee.printSchema()

root
 |-- emp_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- salary : integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)



## Column Operations

In [15]:
employee.show()

+------+-------+----+-------+----+---+
|emp_id|   name|dept|salary | age|sex|
+------+-------+----+-------+----+---+
|     1|  argha|sale|  10000|  25|  M|
|     2|  sohag|tech|  20000|  25|  M|
|     7|  soura|tech|  30000|  25|  M|
|     4| shivam|tech|  10000|  26|  F|
|     5|  lohit|  hr|  40000|  26|  F|
|     6|prakash|  hr|  50000|  26|  M|
|     8|sourabh|tech|  60000|null|  F|
|    10|  biswa|sale|  50000|null|  M|
|     7|   null|tech|  30000|  25|  M|
|     6|prakash|null|  50000|  26|  M|
|     4| shivam|tech|  10000|  26|  F|
+------+-------+----+-------+----+---+



In [16]:
employee = employee.withColumn("new_age", employee["age"] * 3 )


In [17]:
employee.show()

+------+-------+----+-------+----+---+-------+
|emp_id|   name|dept|salary | age|sex|new_age|
+------+-------+----+-------+----+---+-------+
|     1|  argha|sale|  10000|  25|  M|     75|
|     2|  sohag|tech|  20000|  25|  M|     75|
|     7|  soura|tech|  30000|  25|  M|     75|
|     4| shivam|tech|  10000|  26|  F|     78|
|     5|  lohit|  hr|  40000|  26|  F|     78|
|     6|prakash|  hr|  50000|  26|  M|     78|
|     8|sourabh|tech|  60000|null|  F|   null|
|    10|  biswa|sale|  50000|null|  M|   null|
|     7|   null|tech|  30000|  25|  M|     75|
|     6|prakash|null|  50000|  26|  M|     78|
|     4| shivam|tech|  10000|  26|  F|     78|
+------+-------+----+-------+----+---+-------+



In [18]:
employee = employee.withColumnRenamed("salary ","salary" )

In [19]:
employee.columns

['emp_id', 'name', 'dept', 'salary', 'age', 'sex', 'new_age']

In [20]:
employee.select(["name","age"]).show()

+-------+----+
|   name| age|
+-------+----+
|  argha|  25|
|  sohag|  25|
|  soura|  25|
| shivam|  26|
|  lohit|  26|
|prakash|  26|
|sourabh|null|
|  biswa|null|
|   null|  25|
|prakash|  26|
| shivam|  26|
+-------+----+



# Running SQL queries after converting DataFrame to Temp View

In [21]:
employee.createOrReplaceTempView("people")

In [22]:
results = spark.sql("SELECT * FROM people")
results.show()

+------+-------+----+------+----+---+-------+
|emp_id|   name|dept|salary| age|sex|new_age|
+------+-------+----+------+----+---+-------+
|     1|  argha|sale| 10000|  25|  M|     75|
|     2|  sohag|tech| 20000|  25|  M|     75|
|     7|  soura|tech| 30000|  25|  M|     75|
|     4| shivam|tech| 10000|  26|  F|     78|
|     5|  lohit|  hr| 40000|  26|  F|     78|
|     6|prakash|  hr| 50000|  26|  M|     78|
|     8|sourabh|tech| 60000|null|  F|   null|
|    10|  biswa|sale| 50000|null|  M|   null|
|     7|   null|tech| 30000|  25|  M|     75|
|     6|prakash|null| 50000|  26|  M|     78|
|     4| shivam|tech| 10000|  26|  F|     78|
+------+-------+----+------+----+---+-------+



In [23]:
new_results = spark.sql("SELECT * FROM people WHERE age>25")
new_results.show()

+------+-------+----+------+---+---+-------+
|emp_id|   name|dept|salary|age|sex|new_age|
+------+-------+----+------+---+---+-------+
|     4| shivam|tech| 10000| 26|  F|     78|
|     5|  lohit|  hr| 40000| 26|  F|     78|
|     6|prakash|  hr| 50000| 26|  M|     78|
|     6|prakash|null| 50000| 26|  M|     78|
|     4| shivam|tech| 10000| 26|  F|     78|
+------+-------+----+------+---+---+-------+



# Basic Conditional Operations

In [24]:
employee.filter(employee["dept"] == "tech" ).show()

+------+-------+----+------+----+---+-------+
|emp_id|   name|dept|salary| age|sex|new_age|
+------+-------+----+------+----+---+-------+
|     2|  sohag|tech| 20000|  25|  M|     75|
|     7|  soura|tech| 30000|  25|  M|     75|
|     4| shivam|tech| 10000|  26|  F|     78|
|     8|sourabh|tech| 60000|null|  F|   null|
|     7|   null|tech| 30000|  25|  M|     75|
|     4| shivam|tech| 10000|  26|  F|     78|
+------+-------+----+------+----+---+-------+



In [25]:
apple = spark.read.csv("AAPL_data.csv", inferSchema=True, header = True)

In [26]:
apple.show()

+-------------------+-------+-------+-------+-------+---------+----+
|               date|   open|   high|    low|  close|   volume|Name|
+-------------------+-------+-------+-------+-------+---------+----+
|2013-02-08 00:00:00|67.7142|68.4014|66.8928|67.8542|158168416|AAPL|
|2013-02-11 00:00:00|68.0714|69.2771|67.6071|68.5614|129029425|AAPL|
|2013-02-12 00:00:00|68.5014|68.9114|66.8205|66.8428|151829363|AAPL|
|2013-02-13 00:00:00|66.7442|67.6628|66.1742|66.7156|118721995|AAPL|
|2013-02-14 00:00:00|66.3599|67.3771|66.2885|66.6556| 88809154|AAPL|
|2013-02-15 00:00:00|66.9785|67.1656|65.7028|65.7371| 97924631|AAPL|
|2013-02-19 00:00:00|65.8714|66.1042|64.8356|65.7128|108854046|AAPL|
|2013-02-20 00:00:00|65.3842|65.3842|64.1142|64.1214|118891367|AAPL|
|2013-02-21 00:00:00|63.7142|64.1671|63.2599|63.7228|111596821|AAPL|
|2013-02-22 00:00:00|64.1785|64.5142|63.7999|64.4014| 82583823|AAPL|
|2013-02-25 00:00:00|64.8356|65.0171|63.2242|63.2571| 92899597|AAPL|
|2013-02-26 00:00:00|63.4028|64.50

In [27]:
# row count
apple.count()

1259

In [28]:
# column count
len(apple.columns)

7

In [29]:
apple.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+----+
|summary|              open|              high|               low|             close|             volume|Name|
+-------+------------------+------------------+------------------+------------------+-------------------+----+
|  count|              1259|              1259|              1259|              1259|               1259|1259|
|   mean|109.05542891183475|109.95111834789516|108.14158888006372|109.06669849086583|5.404789973550437E7|null|
| stddev| 30.54922002458826|30.686186407983914|30.376223585852472|30.556811676964696|  3.3468353335784E7|null|
|    min|           55.4242|           57.0857|           55.0142|           55.7899|           11475922|AAPL|
|    max|            179.37|             180.1|            178.25|            179.26|          266833581|AAPL|
+-------+------------------+------------------+------------------+------------------+-------------------+----+



In [30]:
apple.filter(apple["open"] > 170).select("volume").show()

+--------+
|  volume|
+--------+
|59398631|
|35026306|
|24361485|
|24409527|
|29185668|
|25145500|
|16982080|
|24782487|
|23637484|
|21899544|
|16262447|
|25131295|
|25588925|
|14026673|
|20716802|
|26428802|
|41666364|
|41527218|
|32542385|
|23355231|
+--------+
only showing top 20 rows



In [31]:
apple.filter((apple["open"]> 100) & (apple["close"]< 100)).show()

+-------------------+------+------+-------+-----+---------+----+
|               date|  open|  high|    low|close|   volume|Name|
+-------------------+------+------+-------+-----+---------+----+
|2014-09-03 00:00:00| 103.1| 103.2|  98.58|98.94|125420521|AAPL|
|2014-09-25 00:00:00|100.51|100.71|  97.72|97.87|100091990|AAPL|
|2014-10-01 00:00:00|100.59|100.69|   98.7|99.18| 51491286|AAPL|
|2014-10-13 00:00:00|101.33|101.78|  99.81|99.81| 53583368|AAPL|
|2014-10-14 00:00:00|100.39|100.52|  98.57|98.75| 63688562|AAPL|
|2016-01-12 00:00:00|100.55|100.69|98.8399|99.96| 49154227|AAPL|
|2016-01-13 00:00:00|100.32|101.19|   97.3|97.39| 62439631|AAPL|
|2016-01-25 00:00:00|101.52|101.53|  99.21|99.44| 51794525|AAPL|
+-------------------+------+------+-------+-----+---------+----+



In [34]:
filter_data = apple.filter((apple["open"]> 100) & (apple["close"]< 100)).collect()

In [42]:
row = filter_data[0]

In [44]:
row.asDict()["close"]

98.94