In [3]:
import pyspark
import pandas as pd

#create spark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Practice').getOrCreate()

In [4]:
spark

In [3]:
pd.read_csv('Book1.csv')

Unnamed: 0,Name,age
0,Ala,10
1,Ble,20
2,Cos,30


In [12]:
type(pd.read_csv('Book1.csv'))

pandas.core.frame.DataFrame

In [4]:
# read dataset through spark
df_pyspark=spark.read.csv('Book1.csv')

In [5]:
df_pyspark


DataFrame[_c0: string, _c1: string]

In [6]:
df_pyspark.show()

+-----+---+
|  _c0|_c1|
+-----+---+
|Name |age|
|  Ala| 10|
|  Ble| 20|
|  Cos| 30|
+-----+---+



In [10]:
# first row as header
df_pyspark=spark.read.option('header','true').csv('Book1.csv')
df_pyspark

DataFrame[Name : string, age: string]

In [8]:
spark.read.option('header','true').csv('Book1.csv').show()

+-----+---+
|Name |age|
+-----+---+
|  Ala| 10|
|  Ble| 20|
|  Cos| 30|
+-----+---+



In [11]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [13]:
df_pyspark.head(3)

[Row(Name ='Ala', age='10'),
 Row(Name ='Ble', age='20'),
 Row(Name ='Cos', age='30')]

In [17]:
df_pyspark.printSchema()

root
 |-- Name : string (nullable = true)
 |-- age: string (nullable = true)



In [18]:
df_pyspark.columns

['Name ', 'age']

In [20]:
df_pyspark.select('age')

DataFrame[age: string]

In [21]:
df_pyspark.select('age').show()

+---+
|age|
+---+
| 10|
| 20|
| 30|
+---+



In [22]:
df_pyspark.select('age','Name ').show()

+---+-----+
|age|Name |
+---+-----+
| 10|  Ala|
| 20|  Ble|
| 30|  Cos|
+---+-----+



In [23]:
df_pyspark.dtypes

[('Name ', 'string'), ('age', 'string')]

In [30]:
# read again with proper datatype
df_sparkk=spark.read.csv('Book1.csv',header=True,inferSchema=True)

In [31]:
df_sparkk.dtypes

[('Name ', 'string'), ('age', 'int')]

In [32]:
df_sparkk.describe()

DataFrame[summary: string, Name : string, age: string]

In [33]:
df_sparkk.describe().show()

+-------+-----+----+
|summary|Name | age|
+-------+-----+----+
|  count|    3|   3|
|   mean| null|20.0|
| stddev| null|10.0|
|    min|  Ala|  10|
|    max|  Cos|  30|
+-------+-----+----+



In [38]:
# add column (base on existing column)


In [40]:
df_sparkk.withColumn('Experience',df_sparkk['age']-5).show()

+-----+---+----------+
|Name |age|Experience|
+-----+---+----------+
|  Ala| 10|         5|
|  Ble| 20|        15|
|  Cos| 30|        25|
+-----+---+----------+



In [41]:
# drop column
df_sparkk.drop('Experience').show()

+-----+---+
|Name |age|
+-----+---+
|  Ala| 10|
|  Ble| 20|
|  Cos| 30|
+-----+---+



In [42]:
# rename
df_sparkk.withColumnRenamed('Name ','New name').show()

+--------+---+
|New name|age|
+--------+---+
|     Ala| 10|
|     Ble| 20|
|     Cos| 30|
+--------+---+



## Data cleaning

In [5]:
df_pyspark2=spark.read.csv('Book2.csv',header=True,inferSchema=True)

In [46]:
df_pyspark2.show()

+-----+----+----------+------+
|Name | age|Experience|Salary|
+-----+----+----------+------+
|  Ala|  10|         1| 30000|
|  Ble|  20|         2| 25000|
|  Cos|  30|         4| 20000|
|  Dse|  40|         8| 18000|
|  Eve|  50|         1| 22000|
|  Far|  60|         2| 23000|
|  Gor|null|      null| 40000|
| null|  70|        10| 38000|
| null|  80|      null|  null|
+-----+----+----------+------+



In [47]:
# drop any row contains null value
df_pyspark2.na.drop().show()

+-----+---+----------+------+
|Name |age|Experience|Salary|
+-----+---+----------+------+
|  Ala| 10|         1| 30000|
|  Ble| 20|         2| 25000|
|  Cos| 30|         4| 20000|
|  Dse| 40|         8| 18000|
|  Eve| 50|         1| 22000|
|  Far| 60|         2| 23000|
+-----+---+----------+------+



In [50]:
#filling missing value
df_pyspark2.na.fill('Missing value').show()

+-------------+----+----------+------+
|        Name | age|Experience|Salary|
+-------------+----+----------+------+
|          Ala|  10|         1| 30000|
|          Ble|  20|         2| 25000|
|          Cos|  30|         4| 20000|
|          Dse|  40|         8| 18000|
|          Eve|  50|         1| 22000|
|          Far|  60|         2| 23000|
|          Gor|null|      null| 40000|
|Missing value|  70|        10| 38000|
|Missing value|  80|      null|  null|
+-------------+----+----------+------+



In [13]:
# fill other null value with mean
from pyspark.ml.feature import Imputer
imputer=Imputer(
                inputCols=['age','Experience','Salary'],
                outputCols=["{}_imputed".format(c) for c in ['age','Experience','Salary']]
               ).setStrategy("mean")

In [18]:
# add imputation cols to df
imputer.fit(df_pyspark2).transform(df_pyspark2).show()

+-----+----+----------+------+-----------+------------------+--------------+
|Name | age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+-----+----+----------+------+-----------+------------------+--------------+
|  Ala|  10|         1| 30000|         10|                 1|         30000|
|  Ble|  20|         2| 25000|         20|                 2|         25000|
|  Cos|  30|         4| 20000|         30|                 4|         20000|
|  Dse|  40|         8| 18000|         40|                 8|         18000|
|  Eve|  50|         1| 22000|         50|                 1|         22000|
|  Far|  60|         2| 23000|         60|                 2|         23000|
|  Gor|null|      null| 40000|         45|                 4|         40000|
| null|  70|        10| 38000|         70|                10|         38000|
| null|  80|      null|  null|         80|                 4|         27000|
+-----+----+----------+------+-----------+------------------+--------------+

## Filter Operation

In [19]:
df_pyspark2.filter("Salary<=20000").show()

+-----+---+----------+------+
|Name |age|Experience|Salary|
+-----+---+----------+------+
|  Cos| 30|         4| 20000|
|  Dse| 40|         8| 18000|
+-----+---+----------+------+



In [20]:
df_pyspark2.filter("Salary<=20000").select(['age','Experience']).show()

+---+----------+
|age|Experience|
+---+----------+
| 30|         4|
| 40|         8|
+---+----------+



In [21]:
df_pyspark2.filter((df_pyspark2['Salary']<=20000) & (df_pyspark2['Experience']>5)).show()

+-----+---+----------+------+
|Name |age|Experience|Salary|
+-----+---+----------+------+
|  Dse| 40|         8| 18000|
+-----+---+----------+------+



In [23]:
df_pyspark2.filter(~(df_pyspark2['Salary']<=20000)).show()

+-----+----+----------+------+
|Name | age|Experience|Salary|
+-----+----+----------+------+
|  Ala|  10|         1| 30000|
|  Ble|  20|         2| 25000|
|  Eve|  50|         1| 22000|
|  Far|  60|         2| 23000|
|  Gor|null|      null| 40000|
| null|  70|        10| 38000|
+-----+----+----------+------+



## Group by & Aggregate Functions

In [24]:
df_pyspark3=spark.read.csv('Book3.csv',header=True,inferSchema=True)
df_pyspark3.show()

+----+---+------+
|Name|Dep|Salary|
+----+---+------+
| Kri| DS| 10000|
| Kri|IOT|  5000|
| Mah| BD|  4000|
| Kri| BD|  4000|
| Mah| DS|  3000|
| Sus| DS| 20000|
| Sus|IOT| 10000|
| Sus| BD|  5000|
| Uny| DS| 10000|
| Uny| BD|  2000|
+----+---+------+



In [25]:
## Group by
df_pyspark3.groupBy('Name').sum().show()

+----+-----------+
|Name|sum(Salary)|
+----+-----------+
| Kri|      19000|
| Uny|      12000|
| Sus|      35000|
| Mah|       7000|
+----+-----------+



In [26]:
df_pyspark3.groupBy('Dep').mean().show()

+---+-----------+
|Dep|avg(Salary)|
+---+-----------+
|IOT|     7500.0|
| BD|     3750.0|
| DS|    10750.0|
+---+-----------+



In [27]:
# direct Aggr
df_pyspark3.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



## Example of PysparkML

In [31]:
# read dataset
py_spark4=spark.read.csv('Book4.csv',header=True,inferSchema=True)


In [32]:
py_spark4.show()
# Use age & Exp to predict Salary

+----+---+----------+------+
|Name|age|Experience|Salary|
+----+---+----------+------+
| Ala| 31|        10| 30000|
| Blc| 30|         8| 25000|
| Clo| 29|         4| 20000|
| Dre| 24|         3| 20000|
| Eve| 21|         2| 15000|
| Flo| 23|         1| 18000|
+----+---+----------+------+



In [35]:
# group the independencies
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['age','Experience'],outputCol='AgeExp')

In [36]:
output=featureassembler.transform(py_spark4)

In [37]:
output.show()

+----+---+----------+------+-----------+
|Name|age|Experience|Salary|     AgeExp|
+----+---+----------+------+-----------+
| Ala| 31|        10| 30000|[31.0,10.0]|
| Blc| 30|         8| 25000| [30.0,8.0]|
| Clo| 29|         4| 20000| [29.0,4.0]|
| Dre| 24|         3| 20000| [24.0,3.0]|
| Eve| 21|         2| 15000| [21.0,2.0]|
| Flo| 23|         1| 18000| [23.0,1.0]|
+----+---+----------+------+-----------+



In [38]:
finaldata=output.select('AgeExp','Salary')
finaldata.show()

+-----------+------+
|     AgeExp|Salary|
+-----------+------+
|[31.0,10.0]| 30000|
| [30.0,8.0]| 25000|
| [29.0,4.0]| 20000|
| [24.0,3.0]| 20000|
| [21.0,2.0]| 15000|
| [23.0,1.0]| 18000|
+-----------+------+



In [39]:
# Use LR model
from pyspark.ml.regression import LinearRegression
train_data,test_data=finaldata.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='AgeExp',labelCol='Salary')
regressor=regressor.fit(train_data)

In [43]:
## coefficient & intercept
regressor.coefficients,regressor.intercept


(DenseVector([1047.1859, 296.191]), -6839.1131324643975)

In [44]:
# prediction
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show()

+----------+------+------------------+
|    AgeExp|Salary|        prediction|
+----------+------+------------------+
|[29.0,4.0]| 20000|24714.042069358118|
+----------+------+------------------+



In [45]:
# other parameter
pred_results.meanAbsoluteError, pred_results.meanSquaredError


(4714.042069358118, 22222192.631678168)