## Tutorial 1 : Basics

In [1]:
import pyspark

In [3]:
import pandas as pd
pd.read_csv('test1.csv')

Unnamed: 0,Name,Age
0,JF,38
1,Tina,35
2,Noah,3
3,Wendyam,2


In [4]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Practice').getOrCreate()

In [5]:
spark

In [None]:
df_pyspark=spark.read.csv('test1.csv')

In [8]:
df_pyspark.show()

+-------+---+
|    _c0|_c1|
+-------+---+
|   Name|Age|
|     JF| 38|
|   Tina| 35|
|   Noah|  3|
|Wendyam|  2|
+-------+---+



In [12]:
df_pyspark=spark.read.option('header','true').csv('test1.csv', inferSchema=True)
df_pyspark.show()

+-------+---+
|   Name|Age|
+-------+---+
|     JF| 38|
|   Tina| 35|
|   Noah|  3|
|Wendyam|  2|
+-------+---+



In [13]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [11]:
df_pyspark.show(2)

+----+---+
|Name|Age|
+----+---+
|  JF| 38|
|Tina| 35|
+----+---+
only showing top 2 rows



## Tutorial 2: Pyspark DataFrames

In [14]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)



In [15]:
df_pyspark=spark.read.csv('test1.csv',header=True, inferSchema=True)
df_pyspark.show()

+-------+---+
|   Name|Age|
+-------+---+
|     JF| 38|
|   Tina| 35|
|   Noah|  3|
|Wendyam|  2|
+-------+---+



In [17]:
df_pyspark.head(3)

[Row(Name='JF', Age=38), Row(Name='Tina', Age=35), Row(Name='Noah', Age=3)]

In [21]:
df_pyspark.select('Name').show()

+-------+
|   Name|
+-------+
|     JF|
|   Tina|
|   Noah|
|Wendyam|
+-------+



In [23]:
df_pyspark.select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|     JF| 38|
|   Tina| 35|
|   Noah|  3|
|Wendyam|  2|
+-------+---+



In [24]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int')]

In [26]:
df_pyspark.describe().show()

+-------+-------+---------------+
|summary|   Name|            Age|
+-------+-------+---------------+
|  count|      4|              4|
|   mean|   NULL|           19.5|
| stddev|   NULL|19.672315572906|
|    min|     JF|              2|
|    max|Wendyam|             38|
+-------+-------+---------------+



In [27]:
##adding colums
df_pyspark.withColumn('Life Experience',df_pyspark['Age']+10).show()

+-------+---+---------------+
|   Name|Age|Life Experience|
+-------+---+---------------+
|     JF| 38|             48|
|   Tina| 35|             45|
|   Noah|  3|             13|
|Wendyam|  2|             12|
+-------+---+---------------+



In [28]:
df_pyspark=df_pyspark.withColumn('Life Experience',df_pyspark['Age']+10)


In [29]:
df_pyspark.show()

+-------+---+---------------+
|   Name|Age|Life Experience|
+-------+---+---------------+
|     JF| 38|             48|
|   Tina| 35|             45|
|   Noah|  3|             13|
|Wendyam|  2|             12|
+-------+---+---------------+



In [30]:
#drop columns
df_pyspark.drop('Life Experience').show()

+-------+---+
|   Name|Age|
+-------+---+
|     JF| 38|
|   Tina| 35|
|   Noah|  3|
|Wendyam|  2|
+-------+---+



In [33]:
### Rename columns
df_pyspark.withColumnRenamed('Age','New').show()

+-------+---+---------------+
|   Name|New|Life Experience|
+-------+---+---------------+
|     JF| 38|             48|
|   Tina| 35|             45|
|   Noah|  3|             13|
|Wendyam|  2|             12|
+-------+---+---------------+



## Tutorial 3 : handling missing values and formating

In [34]:
df_pyspark=spark.read.csv('text2.csv', header=True, inferSchema=True)

In [35]:
df_pyspark.show()

+-------+----+-----------+------+
|   Name| Age|Experience |Salary|
+-------+----+-----------+------+
|   john|  31|         10|150000|
|     jf|  28|          9|167000|
|  wendy|  25|          3| 98000|
|   sidd|  40|         10|100000|
|  vyoma|  45|         15|198000|
|abinaya|  20|          2| 80000|
| esdras|NULL|       NULL|150000|
|  mussa|  17|          1| 60000|
|   kyky|  22|          3| 55000|
|   NULL|  18|       NULL| 25000|
|   NULL|  23|          4|  NULL|
+-------+----+-----------+------+



In [36]:
#remove missing values
df_pyspark.na.drop().show()

+-------+---+-----------+------+
|   Name|Age|Experience |Salary|
+-------+---+-----------+------+
|   john| 31|         10|150000|
|     jf| 28|          9|167000|
|  wendy| 25|          3| 98000|
|   sidd| 40|         10|100000|
|  vyoma| 45|         15|198000|
|abinaya| 20|          2| 80000|
|  mussa| 17|          1| 60000|
|   kyky| 22|          3| 55000|
+-------+---+-----------+------+



In [38]:
#remove missing values
df_pyspark.na.drop(how='any').show()

+-------+---+-----------+------+
|   Name|Age|Experience |Salary|
+-------+---+-----------+------+
|   john| 31|         10|150000|
|     jf| 28|          9|167000|
|  wendy| 25|          3| 98000|
|   sidd| 40|         10|100000|
|  vyoma| 45|         15|198000|
|abinaya| 20|          2| 80000|
|  mussa| 17|          1| 60000|
|   kyky| 22|          3| 55000|
+-------+---+-----------+------+



In [41]:
#threshold
df_pyspark.na.drop(how='any',thresh=3).show()

+-------+---+-----------+------+
|   Name|Age|Experience |Salary|
+-------+---+-----------+------+
|   john| 31|         10|150000|
|     jf| 28|          9|167000|
|  wendy| 25|          3| 98000|
|   sidd| 40|         10|100000|
|  vyoma| 45|         15|198000|
|abinaya| 20|          2| 80000|
|  mussa| 17|          1| 60000|
|   kyky| 22|          3| 55000|
+-------+---+-----------+------+



In [43]:
#subset
df_pyspark.na.drop(how='any',subset=['Experience ']).show()

+-------+---+-----------+------+
|   Name|Age|Experience |Salary|
+-------+---+-----------+------+
|   john| 31|         10|150000|
|     jf| 28|          9|167000|
|  wendy| 25|          3| 98000|
|   sidd| 40|         10|100000|
|  vyoma| 45|         15|198000|
|abinaya| 20|          2| 80000|
|  mussa| 17|          1| 60000|
|   kyky| 22|          3| 55000|
|   NULL| 23|          4|  NULL|
+-------+---+-----------+------+



In [44]:
#Filling the missing values
df_pyspark.na.fill('N/A').show()

+-------+----+-----------+------+
|   Name| Age|Experience |Salary|
+-------+----+-----------+------+
|   john|  31|         10|150000|
|     jf|  28|          9|167000|
|  wendy|  25|          3| 98000|
|   sidd|  40|         10|100000|
|  vyoma|  45|         15|198000|
|abinaya|  20|          2| 80000|
| esdras|NULL|       NULL|150000|
|  mussa|  17|          1| 60000|
|   kyky|  22|          3| 55000|
|    N/A|  18|       NULL| 25000|
|    N/A|  23|          4|  NULL|
+-------+----+-----------+------+



In [45]:
df_pyspark.na.fill('N/A',['Age']).show()

+-------+----+-----------+------+
|   Name| Age|Experience |Salary|
+-------+----+-----------+------+
|   john|  31|         10|150000|
|     jf|  28|          9|167000|
|  wendy|  25|          3| 98000|
|   sidd|  40|         10|100000|
|  vyoma|  45|         15|198000|
|abinaya|  20|          2| 80000|
| esdras|NULL|       NULL|150000|
|  mussa|  17|          1| 60000|
|   kyky|  22|          3| 55000|
|   NULL|  18|       NULL| 25000|
|   NULL|  23|          4|  NULL|
+-------+----+-----------+------+



In [46]:
df_pyspark.columns

['Name', 'Age', 'Experience ', 'Salary']

In [47]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience : integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [50]:
#fill missing values with median, mean, ....
from pyspark.ml.feature import Imputer

imputer = Imputer(

    inputCols=['Age', 'Experience ', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['Age', 'Experience ', 'Salary']]
).setStrategy("mean")

In [51]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+----+-----------+------+-----------+-------------------+--------------+
|   Name| Age|Experience |Salary|Age_imputed|Experience _imputed|Salary_imputed|
+-------+----+-----------+------+-----------+-------------------+--------------+
|   john|  31|         10|150000|         31|                 10|        150000|
|     jf|  28|          9|167000|         28|                  9|        167000|
|  wendy|  25|          3| 98000|         25|                  3|         98000|
|   sidd|  40|         10|100000|         40|                 10|        100000|
|  vyoma|  45|         15|198000|         45|                 15|        198000|
|abinaya|  20|          2| 80000|         20|                  2|         80000|
| esdras|NULL|       NULL|150000|         26|                  6|        150000|
|  mussa|  17|          1| 60000|         17|                  1|         60000|
|   kyky|  22|          3| 55000|         22|                  3|         55000|
|   NULL|  18|       NULL| 2

In [52]:
spark=SparkSession.builder.appName('DataFrame').getOrCreate()

In [53]:
df=spark.read.csv('text2.csv', header=True,inferSchema=True)

## Filter operation

In [55]:
###Get salary less than 140K
df.filter('Salary<=140000').show()

+-------+---+-----------+------+
|   Name|Age|Experience |Salary|
+-------+---+-----------+------+
|  wendy| 25|          3| 98000|
|   sidd| 40|         10|100000|
|abinaya| 20|          2| 80000|
|  mussa| 17|          1| 60000|
|   kyky| 22|          3| 55000|
|   NULL| 18|       NULL| 25000|
+-------+---+-----------+------+



In [56]:
###Get salary less than 140K for only name and agd
df.filter('Salary<=140000').select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|  wendy| 25|
|   sidd| 40|
|abinaya| 20|
|  mussa| 17|
|   kyky| 22|
|   NULL| 18|
+-------+---+



In [57]:
###Get salary less than 140K for only name and agd
df.filter(df['Salary']<=140000).select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|  wendy| 25|
|   sidd| 40|
|abinaya| 20|
|  mussa| 17|
|   kyky| 22|
|   NULL| 18|
+-------+---+



In [59]:
df.filter((df['Salary']<=140000) & (df['Salary']>80000)).select(['Name','Age']).show()

+-----+---+
| Name|Age|
+-----+---+
|wendy| 25|
| sidd| 40|
+-----+---+



In [61]:
df.filter((df['Salary']<=140000) & (df['Age']>27)).show()

+----+---+-----------+------+
|Name|Age|Experience |Salary|
+----+---+-----------+------+
|sidd| 40|         10|100000|
+----+---+-----------+------+



## group by and aggregate functions

In [62]:
spark

In [64]:
df=spark.read.csv('text3.csv',header=True,inferSchema=True)
df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
|   jd|        AI| 54165|
|vyoma|        DS|   656|
| sidd|        DS|    65|
|amesh|        AI|     2|
|hudds|        AI|    68|
|  bra|        DS|    65|
| dfas|        tt|    65|
| dfas|        tt|    32|
|  dfd|        AI|   695|
+-----+----------+------+



In [65]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [67]:
#Group by Name 
df.groupBy(df['Name']).sum().show()

+-----+-----------+
| Name|sum(Salary)|
+-----+-----------+
|   jd|      54165|
|hudds|         68|
|  dfd|        695|
|amesh|          2|
| dfas|         97|
|  bra|         65|
|vyoma|        656|
| sidd|         65|
+-----+-----------+



In [69]:
df.groupBy('Department').sum().show()

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|        tt|         97|
|        AI|      54930|
|        DS|        786|
+----------+-----------+



In [70]:
df.groupBy('Department').max().show()

+----------+-----------+
|Department|max(Salary)|
+----------+-----------+
|        tt|         65|
|        AI|      54165|
|        DS|        656|
+----------+-----------+



In [71]:
df.groupBy('Department').mean().show()

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|        tt|       48.5|
|        AI|    13732.5|
|        DS|      262.0|
+----------+-----------+



In [72]:
df.groupBy('Department').count().show()

+----------+-----+
|Department|count|
+----------+-----+
|        tt|    2|
|        AI|    4|
|        DS|    3|
+----------+-----+



In [73]:
df.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|      55813|
+-----------+



In [75]:
df.groupBy('Department').agg({'salary':'sum','Name':'count'}).show()

+----------+-----------+-----------+
|Department|sum(salary)|count(Name)|
+----------+-----------+-----------+
|        tt|         97|          2|
|        AI|      54930|          4|
|        DS|        786|          3|
+----------+-----------+-----------+



In [76]:
df.show()

+-----+----------+------+
| Name|Department|Salary|
+-----+----------+------+
|   jd|        AI| 54165|
|vyoma|        DS|   656|
| sidd|        DS|    65|
|amesh|        AI|     2|
|hudds|        AI|    68|
|  bra|        DS|    65|
| dfas|        tt|    65|
| dfas|        tt|    32|
|  dfd|        AI|   695|
+-----+----------+------+



##  Pyspark ML

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Missing').getOrCreate()

In [3]:
df=spark.read.csv('test_4.csv', header=True, inferSchema=True)
df.show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|   john| 50|        10| 35000|
|   sidd| 32|        15| 55000|
|  vyoma| 65|        20| 65000|
|    pat| 30|         8|100000|
|   tina| 22|         9|136562|
|   noah| 66|        24|158322|
|wendyam| 33|        10| 25000|
| solame| 22|        10|158000|
| esdras| 44|        20| 25651|
|  elisa| 55|        23|241455|
+-------+---+----------+------+



In [4]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [5]:
df.columns

['Name', 'Age', 'Experience', 'Salary']

In [None]:
['Age', 'Experience'] ---- new feature --- independant feature

In [6]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['Age', 'Experience'],outputCol="Independant Feature")

In [7]:
output=featureassembler.transform(df)

In [9]:
output.show()

+-------+---+----------+------+-------------------+
|   Name|Age|Experience|Salary|Independant Feature|
+-------+---+----------+------+-------------------+
|   john| 50|        10| 35000|        [50.0,10.0]|
|   sidd| 32|        15| 55000|        [32.0,15.0]|
|  vyoma| 65|        20| 65000|        [65.0,20.0]|
|    pat| 30|         8|100000|         [30.0,8.0]|
|   tina| 22|         9|136562|         [22.0,9.0]|
|   noah| 66|        24|158322|        [66.0,24.0]|
|wendyam| 33|        10| 25000|        [33.0,10.0]|
| solame| 22|        10|158000|        [22.0,10.0]|
| esdras| 44|        20| 25651|        [44.0,20.0]|
|  elisa| 55|        23|241455|        [55.0,23.0]|
+-------+---+----------+------+-------------------+



In [10]:
finalized_data=output.select("Salary","Independant Feature")
finalized_data.show()

+------+-------------------+
|Salary|Independant Feature|
+------+-------------------+
| 35000|        [50.0,10.0]|
| 55000|        [32.0,15.0]|
| 65000|        [65.0,20.0]|
|100000|         [30.0,8.0]|
|136562|         [22.0,9.0]|
|158322|        [66.0,24.0]|
| 25000|        [33.0,10.0]|
|158000|        [22.0,10.0]|
| 25651|        [44.0,20.0]|
|241455|        [55.0,23.0]|
+------+-------------------+



In [11]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=finalized_data.randomSplit([.75,.25])
regressor=LinearRegression(featuresCol='Independant Feature', labelCol='Salary')
regressor=regressor.fit(train_data)

In [12]:
regressor.coefficients

DenseVector([-1581.178, 6728.1143])

In [13]:
pre_results=regressor.evaluate(test_data)

In [16]:
pre_results.predictions.show()

+------+-------------------+-----------------+
|Salary|Independant Feature|       prediction|
+------+-------------------+-----------------+
| 35000|        [50.0,10.0]|56359.66441102744|
+------+-------------------+-----------------+



In [17]:
pre_results.meanAbsoluteError,pre_results.meanSquaredError

(21359.664411027443, 456235263.7517123)