In [1]:
#!pip install pyspark

In [2]:
import numpy as np
import pyspark
import pandas as pd
import os
os.chdir('C:\\Users\\DR_OM\\Z-Jupyter Extras')

In [3]:
#pd.read_csv('Test1.csv')

In [4]:
from pyspark.sql import SparkSession

In [5]:
## How to start a session
spark = SparkSession.builder.appName('DataFrame').getOrCreate()

In [6]:
spark

In [7]:
df_pyspark = spark.read.csv('Test1.csv')
df_pyspark.show()

+--------+----+----------+------+
|     _c0| _c1|       _c2|   _c3|
+--------+----+----------+------+
|    Name| Age|Experience|Salary|
|   Krish|  31|        10| 30000|
|Sudhansh|  30|         8| 25000|
|   Sunny|  29|         4| 20000|
|    Paul|  24|         3| 20000|
|  Harsha|  21|         1| 15000|
| Shubham|  23|         2| 18000|
|   Mahes|NULL|      NULL| 40000|
|    NULL|  34|        10| 38000|
|    NULL|  36|      NULL|  NULL|
+--------+----+----------+------+



In [8]:
## Set the header of the dataset
# inferSchema lets spark assign the correct dtype to columns. printSchema() displays the results
df_pyspark = spark.read.option('header', True).csv('Test1.csv', inferSchema=True)
#df_pyspark.show()

In [9]:
df_pyspark.printSchema()
#df_pyspark.columns

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [10]:
#df_pyspark.select('Name').show()
#df_pyspark.select(['Name', 'Age']).show()
#df_pyspark.dtypes
#df_pyspark.describe().show()

In [11]:
## Adding Columns in data frame
df_pyspark = df_pyspark.withColumn('Experience After 2 years', df_pyspark['Experience'] + 2)
#df_pyspark.show()

In [12]:
## Drop the columns
df_pyspark = df_pyspark.drop('Experience After 2 years')
#df_pyspark.show()

In [13]:
## Rename the column
#df_pyspark = df_pyspark.withColumnRenamed('Name', 'New Name')
#df_pyspark.show()

In [14]:
## Drop all NAN rows
#df_pyspark.na.drop().show()

In [15]:
##  Threshold: drop rows that have less than `thresh` non-null values.
#df_pyspark.na.drop(thresh=3).show()
#df_pyspark.na.drop(thresh=5).show()


In [16]:
## Subset
#df_pyspark.show()
#df_pyspark.na.drop(subset=['Experience']).show()
#df_pyspark.na.drop(subset=['Age']).show()

In [17]:
## Filling missing values
## This works for only String columns
#df_pyspark.na.fill("Missing").show()

In [18]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
            inputCols=['Age', 'Experience', 'Salary'],
            outputCols=["{}_imputed".format(x) for x in ['Age', 'Experience', 'Salary']]
        ).setStrategy('mean')
imputer2 = Imputer(
            inputCols=['Age', 'Experience', 'Salary'],
            outputCols=["{}_imputed".format(x) for x in ['Age', 'Experience', 'Salary']]
        ).setStrategy('median')

In [19]:
## Add imputation columns to df
#imputer.fit(df_pyspark).transform(df_pyspark).show()
#imputer2.fit(df_pyspark).transform(df_pyspark).show()

In [20]:
## Filter
df_pyspark2 = df_pyspark.na.drop(thresh=4)
#df_pyspark2.show()

In [21]:
## Salary of people less than or equal to 20,000
#df_pyspark2.filter('Salary<=20000').show()

In [22]:
## Select specific columns from filter
#df_pyspark2.filter('Salary<=20000').select(['Name', 'Age']).show()
#df_pyspark2.filter(df_pyspark2['Salary']<=20000).show()

In [23]:
#df_pyspark2.filter((df_pyspark2['Salary']<=20000) & (df_pyspark2['Age']>23)).show()
# not symbol ~
#df_pyspark2.filter((df_pyspark2['Salary']<=20000) & ~(df_pyspark2['Age']>23)).show()

In [24]:
df_pyspark3 = spark.read.option('header', True).csv('Test2.csv', inferSchema=True)

In [25]:
#df_pyspark3.show()

In [26]:
## Groupby and Aggregate Functions
#df_pyspark3.printSchema()

In [27]:
## Groupby
#df_pyspark3.groupBy('Name').sum().show()

## Groupby departments for max salary
#df_pyspark3.groupBy('Departments').sum().show()

## Groupby departments by mean salary
#df_pyspark3.groupBy('Departments').mean().show()

In [28]:
#df_pyspark3.groupBy('Name').agg({'Salary':'sum'}).show()

In [29]:
df_pyspark2.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Krish| 31|        10| 30000|
|Sudhansh| 30|         8| 25000|
|   Sunny| 29|         4| 20000|
|    Paul| 24|         3| 20000|
|  Harsha| 21|         1| 15000|
| Shubham| 23|         2| 18000|
+--------+---+----------+------+



#### ['Age', 'Experience'] ---> new feature ---> independent feature

In [30]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols=['Age', 'Experience'],outputCol='Independent Feature')

In [31]:
output = feature_assembler.transform(df_pyspark2)
output.show()
output.columns

+--------+---+----------+------+-------------------+
|    Name|Age|Experience|Salary|Independent Feature|
+--------+---+----------+------+-------------------+
|   Krish| 31|        10| 30000|        [31.0,10.0]|
|Sudhansh| 30|         8| 25000|         [30.0,8.0]|
|   Sunny| 29|         4| 20000|         [29.0,4.0]|
|    Paul| 24|         3| 20000|         [24.0,3.0]|
|  Harsha| 21|         1| 15000|         [21.0,1.0]|
| Shubham| 23|         2| 18000|         [23.0,2.0]|
+--------+---+----------+------+-------------------+



['Name', 'Age', 'Experience', 'Salary', 'Independent Feature']

In [32]:
finalized_data = output.select('Independent Feature', 'Salary')
finalized_data.show()

+-------------------+------+
|Independent Feature|Salary|
+-------------------+------+
|        [31.0,10.0]| 30000|
|         [30.0,8.0]| 25000|
|         [29.0,4.0]| 20000|
|         [24.0,3.0]| 20000|
|         [21.0,1.0]| 15000|
|         [23.0,2.0]| 18000|
+-------------------+------+



In [33]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='Independent Feature', labelCol='Salary')
regressor = regressor.fit(train_data)

In [34]:
## Coefficients
regressor.coefficients

DenseVector([-2087.7193, 3561.4035])

In [35]:
## Intercepts
regressor.intercept

59140.35087715416

In [36]:
## Prediction
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

+-------------------+------+------------------+
|Independent Feature|Salary|        prediction|
+-------------------+------+------------------+
|         [21.0,1.0]| 15000|18859.649122805153|
|         [29.0,4.0]| 20000|12842.105263165002|
+-------------------+------+------------------+



In [37]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(5508.7719298200755, 33066174.20739037)