In [4]:
!pip install pyspark



In [5]:
import pyspark

In [6]:
import pandas as pd

df = pd.read_csv("/content/pysparkexamples.csv")

In [7]:
df.head()

Unnamed: 0,Name,age
0,Krish,31
1,Sudhansh,30
2,Sunny,29


In [8]:
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [10]:
spark

In [11]:
df_pyspark = spark.read.csv('/content/pysparkexamples.csv')

In [12]:
df_pyspark

DataFrame[_c0: string, _c1: string]

In [13]:
df_pyspark.show() #to see the entire dataset

+--------+---+
|     _c0|_c1|
+--------+---+
|    Name|age|
|   Krish| 31|
|Sudhansh| 30|
|   Sunny| 29|
+--------+---+



In [14]:
spark.read.option('header','true').csv('/content/pysparkexamples.csv') #to ensure the first column is my header

DataFrame[Name: string, age: string]

In [15]:
spark.read.option('header','true').csv('/content/pysparkexamples.csv').show() #To ensure that the first columsn is my header

+--------+---+
|    Name|age|
+--------+---+
|   Krish| 31|
|Sudhansh| 30|
|   Sunny| 29|
+--------+---+



In [16]:
df_pyspark = spark.read.option('header','true').csv('/content/pysparkexamples.csv')

In [17]:
type(df_pyspark)

In [18]:
df_pyspark.head(3)

[Row(Name='Krish', age='31'),
 Row(Name='Sudhansh', age='30'),
 Row(Name='Sunny', age='29')]

In [19]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)



Pyspark Dataframe
Reading the dataset
checking the datatypes of the column(schema)
selecting columns and Indexing
checking describe similar to pandas
Adding columns
dropping columns

In [20]:
from pyspark.sql import SparkSession

In [21]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate() #to start a session

In [22]:
spark #tells us the version

In [23]:
##to read the dataset
df_pyspark = spark.read.option('header','true').csv('/content/test1.csv').show()

+--------+---+-----------+
|    Name|age|Experience |
+--------+---+-----------+
|   Krish| 31|         10|
|Sudhansh| 30|          8|
|   Sunny| 29|          4|
+--------+---+-----------+



In [24]:
##to read the dataset
df_pyspark = spark.read.option('header','true').csv('/content/test1.csv', inferSchema=True)

In [25]:
##to check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience : integer (nullable = true)



In [26]:
##to read the dataset
df_pyspark = spark.read.csv('/content/test1.csv',header=True,inferSchema=True)
df_pyspark.show()

+--------+---+-----------+
|    Name|age|Experience |
+--------+---+-----------+
|   Krish| 31|         10|
|Sudhansh| 30|          8|
|   Sunny| 29|          4|
+--------+---+-----------+



In [27]:
type(df_pyspark)

In [28]:
df_pyspark.columns

['Name', 'age', 'Experience ']

In [29]:
df_pyspark.show()

+--------+---+-----------+
|    Name|age|Experience |
+--------+---+-----------+
|   Krish| 31|         10|
|Sudhansh| 30|          8|
|   Sunny| 29|          4|
+--------+---+-----------+



In [30]:
#to see a specific column
df_pyspark.select('Name').show()

+--------+
|    Name|
+--------+
|   Krish|
|Sudhansh|
|   Sunny|
+--------+



In [31]:
type(df_pyspark.select('Name'))

In [32]:
#to select two columns
df_pyspark.show()

+--------+---+-----------+
|    Name|age|Experience |
+--------+---+-----------+
|   Krish| 31|         10|
|Sudhansh| 30|          8|
|   Sunny| 29|          4|
+--------+---+-----------+



In [33]:
df_pyspark.select('Name')

DataFrame[Name: string]

In [34]:
df_pyspark.select('Name').show()

+--------+
|    Name|
+--------+
|   Krish|
|Sudhansh|
|   Sunny|
+--------+



In [35]:
type(df_pyspark.select('Name'))

In [36]:
df_pyspark['Name']

Column<'Name'>

In [37]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience ', 'int')]

In [38]:
df_pyspark.describe().show()

+-------+-----+----+-----------------+
|summary| Name| age|      Experience |
+-------+-----+----+-----------------+
|  count|    3|   3|                3|
|   mean| NULL|30.0|7.333333333333333|
| stddev| NULL| 1.0|3.055050463303893|
|    min|Krish|  29|                4|
|    max|Sunny|  31|               10|
+-------+-----+----+-----------------+



##Adding columns

In [41]:
df_pyspark.withColumn('Experience After 2 years',df_pyspark['Experience ']+2).show()

+--------+---+-----------+------------------------+
|    Name|age|Experience |Experience After 2 years|
+--------+---+-----------+------------------------+
|   Krish| 31|         10|                      12|
|Sudhansh| 30|          8|                      10|
|   Sunny| 29|          4|                       6|
+--------+---+-----------+------------------------+



In [42]:
##Drop the columns
df_pyspark.drop('Experience After 2 years')

DataFrame[Name: string, age: int, Experience : int]

In [43]:
#to show the dropped columns
df_pyspark.drop('Experience After 2 years').show()

+--------+---+-----------+
|    Name|age|Experience |
+--------+---+-----------+
|   Krish| 31|         10|
|Sudhansh| 30|          8|
|   Sunny| 29|          4|
+--------+---+-----------+



In [44]:
df_pyspark.show()

+--------+---+-----------+
|    Name|age|Experience |
+--------+---+-----------+
|   Krish| 31|         10|
|Sudhansh| 30|          8|
|   Sunny| 29|          4|
+--------+---+-----------+



##Pyspar Handling Missing values
#Drop Rows
#Various parameters in Dropping functionalities
#Handling Missing values by Mean, Median and Mode


In [45]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [53]:
df_pyspark= spark.read.csv('/content/test2.csv',header=True,inferSchema=True)

In [54]:
df_pyspark.show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [55]:
#tp drop the columns
df_pyspark.drop('Name')

DataFrame[age: int, Experience : int, Salary: int]

In [56]:
#to drop null values
df_pyspark.na.drop().show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [57]:
##drop any how
df_pyspark.na.drop(how='any').show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [58]:
##threshold to drop the three null in a row
df_pyspark.na.drop(how='any',thresh=3).show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [59]:
##ubset
df_pyspark.na.drop(how='any',subset=['Age']).show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [60]:
##subset
df_pyspark.na.drop(how='any',subset=['Experience ']).show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [61]:
##filling the missing value. Replacing the missing values in Experience column with Missing values
df_pyspark.na.fill('Missing Values',['Experience ']).show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [67]:
##filling missing values for two columns
df_pyspark.na.fill('Missing Values',['Name','age']).show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [70]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience '],
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience ']]
    ).setStrategy("median")    #to replace the null in age and experience with median

In [71]:
#Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+--------+---+-----------+------+-----------+-------------------+
|    Name|age|Experience |Salary|age_imputed|Experience _imputed|
+--------+---+-----------+------+-----------+-------------------+
|   Krish| 31|         10| 30000|         31|                 10|
|Sudhansh| 30|          8| 25000|         30|                  8|
|   Sunny| 29|          4| 15000|         29|                  4|
|    Paul| 31|          7| 19000|         31|                  7|
|   James| 42|          5| 24000|         42|                  5|
+--------+---+-----------+------+-----------+-------------------+



Pyspark Dataframes

filter operation
&,|,==
~

In [72]:
from pyspark.sql import SparkSession

In [73]:
spark = SparkSession.builder.appName('dataframe').getOrCreate()

In [75]:
df_pyspark = spark.read.csv('/content/test1.csv',header=True,inferSchema=True)

In [76]:
df_pyspark.show()

+--------+---+-----------+
|    Name|age|Experience |
+--------+---+-----------+
|   Krish| 31|         10|
|Sudhansh| 30|          8|
|   Sunny| 29|          4|
+--------+---+-----------+



##Filter Operations

In [78]:
df_pyspark = spark.read.csv('/content/test2.csv',header=True,inferSchema=True)

In [79]:
##salary of people less than or equal 20000
df_pyspark.filter('Salary<=20000').show()

+-----+---+-----------+------+
| Name|age|Experience |Salary|
+-----+---+-----------+------+
|Sunny| 29|          4| 15000|
| Paul| 31|          7| 19000|
+-----+---+-----------+------+



In [80]:
#filter nsmr and age saalary greater than 20000
df_pyspark.filter('Salary<=20000').select(['Name','age']).show()

+-----+---+
| Name|age|
+-----+---+
|Sunny| 29|
| Paul| 31|
+-----+---+



In [81]:
df_pyspark.filter(df_pyspark['Salary']<=20000).show()

+-----+---+-----------+------+
| Name|age|Experience |Salary|
+-----+---+-----------+------+
|Sunny| 29|          4| 15000|
| Paul| 31|          7| 19000|
+-----+---+-----------+------+



In [82]:
#and operation
df_pyspark.filter((df_pyspark['Salary']<=20000) & (df_pyspark['Salary']>=15000)).show()

+-----+---+-----------+------+
| Name|age|Experience |Salary|
+-----+---+-----------+------+
|Sunny| 29|          4| 15000|
| Paul| 31|          7| 19000|
+-----+---+-----------+------+



In [83]:
#not operation
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



Pyspark with python groupby and aggregate function

In [84]:
from pyspark.sql import SparkSession

In [85]:
spark = SparkSession.builder.appName('Agg').getOrCreate()

In [86]:
df_pyspark=spark.read.csv('/content/test3.csv',header=True,inferSchema=True)

In [87]:
df_pyspark.show()

+--------+------------+------+
|    Name| Departments|Salary|
+--------+------------+------+
|   Krish|Data Science| 85000|
|Sudhansh|          BA|670000|
|   Sunny|Data Science| 42000|
|    Paul|          BA| 55000|
|   James|Data Analyst| 35000|
|    Kemi|Data Science| 80000|
|     Ade|          BA| 42000|
|   Rermi|Data Analyst| 35000|
|    Alex|Data Science| 60000|
+--------+------------+------+



In [88]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [90]:
##Group by finction
df_pyspark.groupBy('Name').sum()

DataFrame[Name: string, sum(Salary): bigint]

In [91]:
##group by to find the max saalry
df_pyspark.groupBy('Name').sum().show()

+--------+-----------+
|    Name|sum(Salary)|
+--------+-----------+
|   James|      35000|
|    Kemi|      80000|
|   Rermi|      35000|
|    Alex|      60000|
|     Ade|      42000|
|   Sunny|      42000|
|   Krish|      85000|
|Sudhansh|     670000|
|    Paul|      55000|
+--------+-----------+



In [92]:
##grouop by department which gives max salary
df_pyspark.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(Salary)|
+------------+-----------+
|          BA|     767000|
|Data Analyst|      70000|
|Data Science|     267000|
+------------+-----------+



In [93]:
##to find the mean of departments
df_pyspark.groupBy('Departments').mean().show()

+------------+------------------+
| Departments|       avg(Salary)|
+------------+------------------+
|          BA|255666.66666666666|
|Data Analyst|           35000.0|
|Data Science|           66750.0|
+------------+------------------+



In [94]:
#how mnay employyess working based on department
df_pyspark.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|          BA|    3|
|Data Analyst|    2|
|Data Science|    4|
+------------+-----+



In [95]:
#to find the sum of dsalary
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|    1104000|
+-----------+



In [96]:
#who is getting the max salary
df_pyspark.groupBy('Name').max().show()

+--------+-----------+
|    Name|max(Salary)|
+--------+-----------+
|   James|      35000|
|    Kemi|      80000|
|   Rermi|      35000|
|    Alex|      60000|
|     Ade|      42000|
|   Sunny|      42000|
|   Krish|      85000|
|Sudhansh|     670000|
|    Paul|      55000|
+--------+-----------+



In [97]:
#who is getting the min salary
df_pyspark.groupBy('Name').min().show()

+--------+-----------+
|    Name|min(Salary)|
+--------+-----------+
|   James|      35000|
|    Kemi|      80000|
|   Rermi|      35000|
|    Alex|      60000|
|     Ade|      42000|
|   Sunny|      42000|
|   Krish|      85000|
|Sudhansh|     670000|
|    Paul|      55000|
+--------+-----------+



In [98]:
#to find the avg salary
df_pyspark.groupBy('Name').avg().show()

+--------+-----------+
|    Name|avg(Salary)|
+--------+-----------+
|   James|    35000.0|
|    Kemi|    80000.0|
|   Rermi|    35000.0|
|    Alex|    60000.0|
|     Ade|    42000.0|
|   Sunny|    42000.0|
|   Krish|    85000.0|
|Sudhansh|   670000.0|
|    Paul|    55000.0|
+--------+-----------+



based on age and experience predicting the salry

In [99]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Missing').getOrCreate()

In [102]:
#to read the data
training = spark.read.csv('/content/test2.csv',header=True,inferSchema=True)

In [103]:
training.show()

+--------+---+-----------+------+
|    Name|age|Experience |Salary|
+--------+---+-----------+------+
|   Krish| 31|         10| 30000|
|Sudhansh| 30|          8| 25000|
|   Sunny| 29|          4| 15000|
|    Paul| 31|          7| 19000|
|   James| 42|          5| 24000|
+--------+---+-----------+------+



In [104]:
training.columns

['Name', 'age', 'Experience ', 'Salary']

In [106]:
from pyspark.ml.feature import VectorAssembler #helps to group my independent features
featureassembler = VectorAssembler(inputCols=['age','Experience '],outputCol='Independent Features')

In [107]:
output = featureassembler.transform(training)

In [108]:
output.show()

+--------+---+-----------+------+--------------------+
|    Name|age|Experience |Salary|Independent Features|
+--------+---+-----------+------+--------------------+
|   Krish| 31|         10| 30000|         [31.0,10.0]|
|Sudhansh| 30|          8| 25000|          [30.0,8.0]|
|   Sunny| 29|          4| 15000|          [29.0,4.0]|
|    Paul| 31|          7| 19000|          [31.0,7.0]|
|   James| 42|          5| 24000|          [42.0,5.0]|
+--------+---+-----------+------+--------------------+



In [109]:
output.columns

['Name', 'age', 'Experience ', 'Salary', 'Independent Features']

In [110]:
finalized_data = output.select('Independent Features','Salary')

In [111]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 15000|
|          [31.0,7.0]| 19000|
|          [42.0,5.0]| 24000|
+--------------------+------+



In [112]:
from pyspark.ml.regression import LinearRegression
##training test split of my data my training is 75% and test 25%
train_data,test_data = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol='Independent Features',labelCol='Salary')
regressor = regressor.fit(train_data)

In [113]:
##coefficient of the data
regressor.coefficients

DenseVector([593.7122, 2277.5091])

In [114]:
##Intercepts
regressor.intercept

-12544.740024183944

In [118]:
##pediction
pred_results = regressor.evaluate(test_data)

In [119]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [30.0,8.0]| 25000|23486.698911729145|
+--------------------+------+------------------+



In [120]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(1513.301088270855, 2290080.183761754)