# Pyspark Basic

In [3]:
#!pip3 install pyspark

In [6]:
#!pip3 install numpy

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [4]:
spark

In [5]:
df_pyspark = spark.read.csv('/home/gourav/Documents/test.csv')

In [11]:
df_pyspark = spark.read.option('header','true').csv('/home/gourav/Documents/test.csv')

In [12]:
df_pyspark.show(2)

+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|    TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
|8041|10-02-15 7:51|      20.29|          33.045|    0|         449|  0.004867773|          0|
| 642|05-02-15 4:32|      20.89|            24.2|    0|         448|  0.003692263|          0|
+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
only showing top 2 rows



In [13]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [16]:
df_pyspark.head()

Row(ID='8041', TimeStamp='10-02-15 7:51', Temperature='20.29', RelativeHumidity='33.045', Light='0', OxygenLevels='449', HumidityRatio='0.004867773', GTOccupancy='0')

In [17]:
df_pyspark.printSchema()

root
 |-- ID: string (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- RelativeHumidity: string (nullable = true)
 |-- Light: string (nullable = true)
 |-- OxygenLevels: string (nullable = true)
 |-- HumidityRatio: string (nullable = true)
 |-- GTOccupancy: string (nullable = true)



# PySpark DataFrames

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [3]:
spark

In [8]:
# Read Dataset
df_pyspark = spark.read.option('header','true').csv('/home/gourav/Documents/test.csv', inferSchema=True)

In [9]:
# Check the Schema
df_pyspark.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- RelativeHumidity: double (nullable = true)
 |-- Light: integer (nullable = true)
 |-- OxygenLevels: double (nullable = true)
 |-- HumidityRatio: double (nullable = true)
 |-- GTOccupancy: integer (nullable = true)



In [10]:
# Another way to load the data

In [11]:
df_pyspark = spark.read.csv('/home/gourav/Documents/test.csv', header=True, inferSchema=True)

In [13]:
df_pyspark.show(2)

+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|    TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
|8041|10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
| 642|05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|
+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
only showing top 2 rows



In [14]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [15]:
df_pyspark.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- RelativeHumidity: double (nullable = true)
 |-- Light: integer (nullable = true)
 |-- OxygenLevels: double (nullable = true)
 |-- HumidityRatio: double (nullable = true)
 |-- GTOccupancy: integer (nullable = true)



In [16]:
df_pyspark.columns

['ID',
 'TimeStamp',
 'Temperature',
 'RelativeHumidity',
 'Light',
 'OxygenLevels',
 'HumidityRatio',
 'GTOccupancy']

In [19]:
df_pyspark.select('ID').show()

+----+
|  ID|
+----+
|8041|
| 642|
|1243|
|1803|
|3834|
|3545|
|  88|
|6005|
|7400|
|5726|
|5221|
|   3|
|4593|
|2055|
|6591|
|2238|
| 739|
|2748|
|7236|
|  21|
+----+
only showing top 20 rows



In [20]:
type(df_pyspark.select('ID'))

pyspark.sql.dataframe.DataFrame

In [21]:
df_pyspark.select(['ID','Light']).show()

+----+-----+
|  ID|Light|
+----+-----+
|8041|    0|
| 642|    0|
|1243|  441|
|1803|    0|
|3834|  829|
|3545|    0|
|  88|    0|
|6005|    0|
|7400|    0|
|5726|   17|
|5221|   14|
|   3|  426|
|4593|    0|
|2055|    0|
|6591|    0|
|2238|    0|
| 739|    0|
|2748|  494|
|7236|    0|
|  21|    0|
+----+-----+
only showing top 20 rows



In [23]:
df_pyspark.dtypes

[('ID', 'int'),
 ('TimeStamp', 'string'),
 ('Temperature', 'double'),
 ('RelativeHumidity', 'double'),
 ('Light', 'int'),
 ('OxygenLevels', 'double'),
 ('HumidityRatio', 'double'),
 ('GTOccupancy', 'int')]

In [25]:
df_pyspark.describe().show()

+-------+------------------+--------------+------------------+------------------+------------------+-----------------+--------------------+-------------------+
|summary|                ID|     TimeStamp|       Temperature|  RelativeHumidity|             Light|     OxygenLevels|       HumidityRatio|        GTOccupancy|
+-------+------------------+--------------+------------------+------------------+------------------+-----------------+--------------------+-------------------+
|  count|                37|            37|                37|                37|                37|               37|                  37|                 37|
|   mean|3557.3783783783783|          null|20.681554054054054|25.309549549729734| 95.35135135135135|578.1824324405407|0.003823931270270...|0.16216216216216217|
| stddev|2545.7406513555075|          null| 1.056793794159559| 5.322368541365135|206.88016072330592|238.4752296157999|8.578258972956203E-4| 0.3736838766118223|
|    min|                 3|04-02-15 17:

In [30]:
# Adding Columns in dataFrame
df_pyspark = df_pyspark.withColumn('Temperature after 1 year', df_pyspark['Temperature']+2)

In [31]:
df_pyspark.columns

['ID',
 'TimeStamp',
 'Temperature',
 'RelativeHumidity',
 'Light',
 'OxygenLevels',
 'HumidityRatio',
 'GTOccupancy',
 'Temperature after 1 year']

In [32]:
df_pyspark.select(['Temperature', 'Temperature after 1 year']).show()

+-----------+------------------------+
|Temperature|Temperature after 1 year|
+-----------+------------------------+
|      20.29|                   22.29|
|      20.89|                   22.89|
|      22.23|                   24.23|
|      20.29|                   22.29|
|      20.76|                   22.76|
|     19.745|                  21.745|
|       22.1|                    24.1|
|       19.5|                    21.5|
|       20.5|                    22.5|
|     19.445|                  21.445|
|       19.2|                    21.2|
|      23.15|                   25.15|
|      19.79|                   21.79|
|       20.5|                    22.5|
|       19.5|                    21.5|
|       20.2|                    22.2|
|      20.89|                   22.89|
|       22.1|                    24.1|
|       21.7|                    23.7|
|      22.89|                   24.89|
+-----------+------------------------+
only showing top 20 rows



In [34]:
# Drop the columns
df_pyspark= df_pyspark.drop('Temperature after 1 year')

In [36]:
df_pyspark.show()

+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|     TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|8041| 10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
| 642| 05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|
|1243|05-02-15 14:32|      22.23|           27.73|  441|       932.0|  0.004599115|          1|
|1803|05-02-15 23:53|      20.29|           21.29|    0|       443.0|  0.003127468|          0|
|3834| 07-02-15 9:43|      20.76|     18.85666667|  829| 452.6666667|  0.002850246|          0|
|3545| 07-02-15 4:54|     19.745|           19.39|    0|       441.0|  0.002752165|          0|
|  88|04-02-15 19:18|       22.1|          27.245|    0|       584.5|  0.004482195|          0|
|6005|08-02-15 21:55|       19.5|       

In [39]:
# Rename the columns
df_pyspark = df_pyspark.withColumnRenamed('Temperature', 'Temperature change')

In [40]:
df_pyspark.columns

['ID',
 'TimeStamp',
 'Temperature change',
 'RelativeHumidity',
 'Light',
 'OxygenLevels',
 'HumidityRatio',
 'GTOccupancy']

# Pyspark handling Missing Values

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [2]:
spark

In [3]:
df_pyspark = spark.read.csv('/home/gourav/Documents/test.csv', header=True,
                           inferSchema=True)

In [5]:
df_pyspark.show(2)

+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|    TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
|8041|10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
| 642|05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|
+----+-------------+-----------+----------------+-----+------------+-------------+-----------+
only showing top 2 rows



In [8]:
# Drop the columns
df_pyspark.drop('ID').show(2)

+-------------+-----------+----------------+-----+------------+-------------+-----------+
|    TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+-------------+-----------+----------------+-----+------------+-------------+-----------+
|10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
|05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|
+-------------+-----------+----------------+-----+------------+-------------+-----------+
only showing top 2 rows



In [11]:
# Drop row based on Null value
df_pyspark.na.drop().show()

+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|     TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|8041| 10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
| 642| 05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|
|1243|05-02-15 14:32|      22.23|           27.73|  441|       932.0|  0.004599115|          1|
|1803|05-02-15 23:53|      20.29|           21.29|    0|       443.0|  0.003127468|          0|
|3834| 07-02-15 9:43|      20.76|     18.85666667|  829| 452.6666667|  0.002850246|          0|
|3545| 07-02-15 4:54|     19.745|           19.39|    0|       441.0|  0.002752165|          0|
|  88|04-02-15 19:18|       22.1|          27.245|    0|       584.5|  0.004482195|          0|
|6005|08-02-15 21:55|       19.5|       

In [12]:
# any==how
# if full row has only na
#df_pyspark.na.drop(how='all')
df_pyspark.na.drop(how='any')

DataFrame[ID: int, TimeStamp: string, Temperature: double, RelativeHumidity: double, Light: int, OxygenLevels: double, HumidityRatio: double, GTOccupancy: int]

In [13]:
#thershold : at least this many value should present
df_pyspark.na.drop(thresh=2)

DataFrame[ID: int, TimeStamp: string, Temperature: double, RelativeHumidity: double, Light: int, OxygenLevels: double, HumidityRatio: double, GTOccupancy: int]

In [14]:
# Subset: if there is na in specific column drop that
df_pyspark.na.drop(how='any', subset=['Temperature'])

DataFrame[ID: int, TimeStamp: string, Temperature: double, RelativeHumidity: double, Light: int, OxygenLevels: double, HumidityRatio: double, GTOccupancy: int]

In [15]:
## Filling the missing value
df_pyspark.na.fill("ANy value")

DataFrame[ID: int, TimeStamp: string, Temperature: double, RelativeHumidity: double, Light: int, OxygenLevels: double, HumidityRatio: double, GTOccupancy: int]

In [20]:
# Imputer value

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['Temperature','RelativeHumidity','OxygenLevels'],
                 outputCols=["{}_imputed".format(c) for c in ['Temperature','RelativeHumidity','OxygenLevels']]
                 ).setStrategy("mean")

In [21]:
# Add imputation cols of df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+----+--------------+-----------+----------------+-----+------------+-------------+-----------+-------------------+------------------------+--------------------+
|  ID|     TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|Temperature_imputed|RelativeHumidity_imputed|OxygenLevels_imputed|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+-------------------+------------------------+--------------------+
|8041| 10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|              20.29|                  33.045|               449.0|
| 642| 05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|              20.89|                    24.2|               448.0|
|1243|05-02-15 14:32|      22.23|           27.73|  441|       932.0|  0.004599115|          1|              22.23|                   27.73|               932.0|
|1803|05-02-15 23:53|      2

# FIlter Operations

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('filter').getOrCreate()

In [2]:
df_pyspark = spark.read.csv('/home/gourav/Documents/test.csv', header=True,
                           inferSchema=True)

In [3]:
df_pyspark.show(3)

+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|     TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|8041| 10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
| 642| 05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|
|1243|05-02-15 14:32|      22.23|           27.73|  441|       932.0|  0.004599115|          1|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
only showing top 3 rows



In [4]:
# Filter Operation
# Temp <=21
df_pyspark.filter('Temperature <=21').show(3)

+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|     TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|8041| 10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
| 642| 05-02-15 4:32|      20.89|            24.2|    0|       448.0|  0.003692263|          0|
|1803|05-02-15 23:53|      20.29|           21.29|    0|       443.0|  0.003127468|          0|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
only showing top 3 rows



In [5]:
df_pyspark.filter('Temperature <=21').select(['RelativeHumidity','Light']).show()

+----------------+-----+
|RelativeHumidity|Light|
+----------------+-----+
|          33.045|    0|
|            24.2|    0|
|           21.29|    0|
|     18.85666667|  829|
|           19.39|    0|
|            27.6|    0|
|            33.9|    0|
|            27.2|   17|
|     31.16666667|   14|
|           27.79|    0|
|          18.745|    0|
|            26.2|    0|
|            18.6|    0|
|           23.29|    0|
|           18.89|    0|
|           18.89|    0|
|           22.89|    0|
|           25.15|    0|
|            33.0|    0|
|           19.39|    0|
+----------------+-----+
only showing top 20 rows



In [6]:
df_pyspark.filter((df_pyspark['Temperature']<=21)&
                  (df_pyspark['RelativeHumidity']>=25)).show()

+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|  ID|     TimeStamp|Temperature|RelativeHumidity|Light|OxygenLevels|HumidityRatio|GTOccupancy|
+----+--------------+-----------+----------------+-----+------------+-------------+-----------+
|8041| 10-02-15 7:51|      20.29|          33.045|    0|       449.0|  0.004867773|          0|
|6005|08-02-15 21:55|       19.5|            27.6|    0|       451.5|   0.00386517|          0|
|7400|09-02-15 21:10|       20.5|            33.9|    0|       740.0|  0.005060459|          0|
|5726|08-02-15 17:16|     19.445|            27.2|   17|       434.5|  0.003795723|          0|
|5221| 08-02-15 8:51|       19.2|     31.16666667|   14|       437.0|  0.004286778|          0|
|4593|07-02-15 22:23|      19.79|           27.79|    0|       437.0|  0.003963136|          0|
|6591| 09-02-15 7:40|       19.5|            26.2|    0|       473.0|  0.003667954|          0|
|4466|07-02-15 20:16|      20.05|       

# Group By and Aggregate 

In [7]:
# Groupby
df_pyspark.groupBy('OxygenLevels').sum().show()

+------------+-------+------------------+---------------------+----------+-----------------+--------------------+----------------+
|OxygenLevels|sum(ID)|  sum(Temperature)|sum(RelativeHumidity)|sum(Light)|sum(OxygenLevels)|  sum(HumidityRatio)|sum(GTOccupancy)|
+------------+-------+------------------+---------------------+----------+-----------------+--------------------+----------------+
|       451.5|   6005|              19.5|                 27.6|         0|            451.5|          0.00386517|               0|
|       868.5|   7362|              20.5|                34.59|         0|            868.5|         0.005164315|               0|
|       449.0|   8041|             20.29|               33.045|         0|            449.0|         0.004867773|               0|
|       443.0|   6269|             40.34|                46.44|         0|            886.0|         0.006770518|               0|
|       445.0|   5922|             19.39|                 27.5|         0|         

In [8]:
df_pyspark.groupBy('OxygenLevels').mean().show()

+------------+-------+------------------+---------------------+----------+-----------------+--------------------+----------------+
|OxygenLevels|avg(ID)|  avg(Temperature)|avg(RelativeHumidity)|avg(Light)|avg(OxygenLevels)|  avg(HumidityRatio)|avg(GTOccupancy)|
+------------+-------+------------------+---------------------+----------+-----------------+--------------------+----------------+
|       451.5| 6005.0|              19.5|                 27.6|       0.0|            451.5|          0.00386517|             0.0|
|       868.5| 7362.0|              20.5|                34.59|       0.0|            868.5|         0.005164315|             0.0|
|       449.0| 8041.0|             20.29|               33.045|       0.0|            449.0|         0.004867773|             0.0|
|       443.0| 3134.5|             20.17|                23.22|       0.0|            443.0|         0.003385259|             0.0|
|       445.0| 5922.0|             19.39|                 27.5|       0.0|         

In [9]:
df_pyspark.groupBy('OxygenLevels').count().show()

+------------+-----+
|OxygenLevels|count|
+------------+-----+
|       451.5|    1|
|       868.5|    1|
|       449.0|    1|
|       443.0|    2|
|       445.0|    1|
|       437.0|    2|
|       434.5|    2|
| 452.6666667|    1|
|       584.5|    1|
|       740.0|    1|
|       437.5|    1|
|       448.0|    2|
|     1002.75|    1|
|       435.0|    1|
|       429.5|    1|
|       473.0|    1|
|       433.0|    1|
|       483.5|    1|
|       780.5|    1|
| 431.6666667|    1|
+------------+-----+
only showing top 20 rows



In [10]:
# Aggregate
df_pyspark.agg({'OxygenLevels':'sum'}).show()

+------------------+
| sum(OxygenLevels)|
+------------------+
|21392.750000300002|
+------------------+



# MLib

<h3> DataFrame API </h3>

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Mlib').getOrCreate()

In [2]:
spark

In [3]:
#load Dataset
training = spark.read.csv('test.csv',header=True,inferSchema=True)

In [4]:
training.show()

+----+---+----------+------+
|Name|age|Experience|Salary|
+----+---+----------+------+
|  Gk| 25|         2|  2000|
|  An| 27|         3|  3000|
|  Ni| 24|         2|  2300|
|  Hu| 20|         1|  1000|
|  Ha| 30|        20|  8000|
|  Ki| 28|        12|  4500|
|  we| 34|        22|  9000|
| hum| 32|        17|  6000|
+----+---+----------+------+



In [5]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [6]:
training.columns

['Name', 'age', 'Experience', 'Salary']

### Group Independent Features
[Age, Experience] -----> new feature----->independent Feature

In [7]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['age','Experience'],
                                  outputCol="Independent Features")

In [8]:
output = featureassembler.transform(training)

In [9]:
output.show()

+----+---+----------+------+--------------------+
|Name|age|Experience|Salary|Independent Features|
+----+---+----------+------+--------------------+
|  Gk| 25|         2|  2000|          [25.0,2.0]|
|  An| 27|         3|  3000|          [27.0,3.0]|
|  Ni| 24|         2|  2300|          [24.0,2.0]|
|  Hu| 20|         1|  1000|          [20.0,1.0]|
|  Ha| 30|        20|  8000|         [30.0,20.0]|
|  Ki| 28|        12|  4500|         [28.0,12.0]|
|  we| 34|        22|  9000|         [34.0,22.0]|
| hum| 32|        17|  6000|         [32.0,17.0]|
+----+---+----------+------+--------------------+



In [10]:
finalized_data = output.select("Independent Features","Salary")

In [11]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|          [25.0,2.0]|  2000|
|          [27.0,3.0]|  3000|
|          [24.0,2.0]|  2300|
|          [20.0,1.0]|  1000|
|         [30.0,20.0]|  8000|
|         [28.0,12.0]|  4500|
|         [34.0,22.0]|  9000|
|         [32.0,17.0]|  6000|
+--------------------+------+



In [12]:
from pyspark.ml.regression import LinearRegression
# train test split
train_data,test_data = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol="Independent Features",
                             labelCol="Salary")
regressor = regressor.fit(train_data)

In [13]:
### Coefficients
regressor.coefficients

DenseVector([125.8502, 265.1165])

In [14]:
#Intercepts
regressor.intercept

-1491.5915915913697

In [15]:
#prediction
pred_result = regressor.evaluate(test_data)

In [16]:
pred_result.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [25.0,2.0]|  2000|2184.895706517314|
|         [28.0,12.0]|  4500|5213.610908205506|
+--------------------+------+-----------------+



In [17]:
pred_result.meanAbsoluteError, pred_result.meanSquaredError

(449.2533073614102, 271713.4752992122)