<a href="https://colab.research.google.com/github/Saheer7/Pyspark/blob/master/4_Spark_Data_Frame_Basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 75kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 39.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=7de871179ee3889a6dca172a98249cbc5cff6c3ef04de46615443cdd64ba72ea
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1



# Spark Data frame basics

In [3]:
from pyspark.sql import SparkSession    #Starting a Spark session

In [4]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [None]:
df= spark.read.json('/content/sample_data/people.json')   #Read input

In [10]:
df.show()   #Display table  #spark automatically replaces missing data with null

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [11]:
df.printSchema()   #Data type of dataframe

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [13]:
df.columns      #This is attribute so no need paranthesis 

['age', 'name']

In [16]:
df.describe()  #Statistical summary of dataframe

DataFrame[summary: string, age: string, name: string]

In [17]:
df.describe().show()   #use show() to display the summary

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



DEFINING OWN SCHEMA: 
Schema has to be correct in dataframe i.e data type of fields should be appropriate, Use below instructions carefully:

In [28]:
 from pyspark.sql.types import (StructField,StringType
                                ,IntegerType,StructType)   

In [33]:
#Creating a list of structure fields
#Structure fields take 3 parameters: Name, datatype and some sort of Nullable

data_schema = [StructField('age',IntegerType(),True),
               StructField('name',StringType(),True)]   
#This creates a structure where 'age' is column ,
#type is int and Whether or not the field can be NULL
#NOTE: Make sure in the structure the datatype parameters are functions and not attributes

In [36]:
final_struct = StructType(fields=data_schema)   

In [37]:
df= spark.read.json('/content/sample_data/people.json',schema=final_struct)   #Read input and with the updated structure

In [38]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



SELECTING OR GRABBING THE DATA:

In [39]:
df['age']   

Column<b'age'>

In [40]:
type(df['age'])  #Column object

pyspark.sql.column.Column

In [41]:
df.select(['age'])

DataFrame[age: int]

In [44]:
type(df.select(['age']))  #Data frame object

pyspark.sql.dataframe.DataFrame

In [45]:
df.select(['age']).show()    #Displaying column

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [53]:
df.head(2) #Display first 2 Rows    #The rows are displayed as a list

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [54]:
df.head(2)[0]   #Selecting first row

Row(age=None, name='Michael')

In [55]:
type(df.head(2)[0])  

pyspark.sql.types.Row

WHY ARE THERE SO MANY SPECIALIZED OBJECTS IN SPARK ? 

Because Spark's ability to read from a distributed data source and 
then map that out to distributed computing

In [57]:
#SELECTING MULTIPLE COLUMNS IN THE DATA
df.select(['age','name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [59]:
#CREATING NEW COLUMN
#Withcolumn return a new dataframe by adding column or replacing existing columns

df.withColumn('newage',df['age']).show()   #Create copy of age column

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [61]:
df.withColumn('double_age',df['age']*2).show()  #Double age

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [62]:
df.show() #Above changes done are not permanent, they only disply results of operations done [Assign to a variable to store]

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [63]:
#RENAME COLUMN NAME
df.withColumnRenamed('age','my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



USING SQL TO INTERACT WITH DATAFRAMES

---



In [71]:
#REGISTER DATAFRAME AS SQL TEMPORARY VIEW

df.createOrReplaceTempView('people_view')    #giving some name to view

#Creates view or replaces if it exists

In [72]:
results = spark.sql("SELECT * FROM PEOPLE_VIEW")

In [73]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [74]:
new_results = spark.sql("SELECT * FROM PEOPLE WHERE AGE=30")
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



SPARK DATA FRAME OPERATIONS

---



In [2]:
df= spark.read.csv()

3184932

In [5]:
df= spark.read.csv('/content/sample_data/appl_stock.csv',inferSchema=True,header=True) 

#inferSchema function will go through the input once to determine the input schema if inferSchema is enabled. 
#To avoid going through the entire data once, disable inferSchema [NOTE: This is func is only for csv & not available for json]
#Header means first rown contain column names

In [6]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [7]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [9]:
df.head(3)[0]

Row(Date='2010-01-04', Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

FILTERING DATA

SQL

---



In [10]:
df.filter("Close < 500").show()   #Closing price less than 500

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [15]:
df.filter("Close < 500").select('Open').show()  #Open price where closing price is less than 500

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [16]:
df.filter("Close < 500").select(['Open','Close']).show()  #Open & Close price where closing price is less than 500

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



Filtering python way

---



In [17]:
df.filter(df['Close']<500).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [18]:
df.filter(df['Close']<500).select('Volume').show()

+---------+
|   Volume|
+---------+
|123432400|
|150476200|
|138040000|
|119282800|
|111902700|
|115557400|
|148614900|
|151473000|
|108223500|
|148516900|
|182501900|
|153038200|
|152038600|
|220441900|
|266424900|
|466777500|
|430642100|
|293375600|
|311488100|
|187469100|
+---------+
only showing top 20 rows



In [19]:
#Filtering based on multiple conditions

df.filter(df['Close']<200 and df['Open']>200).show()   #THIS IS WRONG WAY

ValueError: ignored

In [21]:
df.filter((df['Close']<200) & (df['Open']>200)).show()   #THE RIGHT WAY IS TO USE PARANTHESIS and "&" instead of AND operator, | for OR

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [22]:
df.filter((df['Close']<200) & ~(df['Open']>200)).show()   # ~ implies NOT 

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.323710000000002|
|2010-02-08|        195.690006|197.88000300000002|      

In [23]:
df.filter(df['Low']==197.16).show() #This is only for showing, Like printing

+----------+------------------+----------+------+------+---------+---------+
|      Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+----------+------------------+----------+------+------+---------+---------+
|2010-01-22|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+----------+------------------+----------+------+------+---------+---------+



In [25]:
#For collecting the output and working with that variable then use COLLECT
res= df.filter(df['Low']==197.16).collect()
res   #the list containing result

[Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [26]:
res[0]  #The first item

Row(Date='2010-01-22', Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [27]:
row= res[0]

In [28]:
#Converting row to Dictionary
row.asDict()

{'Adj Close': 25.620401,
 'Close': 197.75,
 'Date': '2010-01-22',
 'High': 207.499996,
 'Low': 197.16,
 'Open': 206.78000600000001,
 'Volume': 220441900}

In [30]:
row.asDict()['Volume']

220441900

GROUPBY and AGGREGATE FUNCTIONS