In [2]:
import pyspark

In [2]:
pyspark.__version__

'3.1.2'

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName(name='practice').getOrCreate()

In [5]:
spark

In [6]:
# Readin the csv file with the first row as header
df = spark.read.csv('Data.csv', header=True)

In [7]:
df.show()

+-------+----+------+---------+
|Country| Age|Salary|Purchased|
+-------+----+------+---------+
| France|  44| 72000|       No|
|  Spain|  27| 48000|      Yes|
|Germany|  30| 54000|       No|
|  Spain|  38| 61000|       No|
|Germany|  40|  null|      Yes|
| France|  35| 58000|      Yes|
|  Spain|null| 52000|       No|
| France|  48| 79000|      Yes|
|Germany|  50| 83000|       No|
| France|  37| 67000|      Yes|
+-------+----+------+---------+



In [8]:
# same as info in pandas
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Purchased: string (nullable = true)



In [9]:
import pandas as pd

In [10]:
dff = pd.read_csv('Data.csv')

In [11]:
dff.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Country    10 non-null     object 
 1   Age        9 non-null      float64
 2   Salary     9 non-null      float64
 3   Purchased  10 non-null     object 
dtypes: float64(2), object(2)
memory usage: 448.0+ bytes


In [12]:
df.select('Country').show()

+-------+
|Country|
+-------+
| France|
|  Spain|
|Germany|
|  Spain|
|Germany|
| France|
|  Spain|
| France|
|Germany|
| France|
+-------+



In [18]:
# same as SQL but the collect method return the values as list
df.select('*').collect()

[Row(Country='France', Age='44', Salary='72000', Purchased='No'),
 Row(Country='Spain', Age='27', Salary='48000', Purchased='Yes'),
 Row(Country='Germany', Age='30', Salary='54000', Purchased='No'),
 Row(Country='Spain', Age='38', Salary='61000', Purchased='No'),
 Row(Country='Germany', Age='40', Salary=None, Purchased='Yes'),
 Row(Country='France', Age='35', Salary='58000', Purchased='Yes'),
 Row(Country='Spain', Age=None, Salary='52000', Purchased='No'),
 Row(Country='France', Age='48', Salary='79000', Purchased='Yes'),
 Row(Country='Germany', Age='50', Salary='83000', Purchased='No'),
 Row(Country='France', Age='37', Salary='67000', Purchased='Yes')]

In [19]:
df.select(['Country', 'Salary']).show()

+-------+------+
|Country|Salary|
+-------+------+
| France| 72000|
|  Spain| 48000|
|Germany| 54000|
|  Spain| 61000|
|Germany|  null|
| France| 58000|
|  Spain| 52000|
| France| 79000|
|Germany| 83000|
| France| 67000|
+-------+------+



In [20]:
# return the type of columns
df.dtypes

[('Country', 'string'),
 ('Age', 'string'),
 ('Salary', 'string'),
 ('Purchased', 'string')]

In [23]:
# return describe of data same as pandas
df.describe().show()

+-------+-------+-----------------+------------------+---------+
|summary|Country|              Age|            Salary|Purchased|
+-------+-------+-----------------+------------------+---------+
|  count|     10|                9|                 9|       10|
|   mean|   null|38.77777777777778| 63777.77777777778|     null|
| stddev|   null|7.693792591722529|12265.579661982732|     null|
|    min| France|               27|             48000|       No|
|    max|  Spain|               50|             83000|      Yes|
+-------+-------+-----------------+------------------+---------+



In [25]:
# pandas
dff.describe()

Unnamed: 0,Age,Salary
count,9.0,9.0
mean,38.777778,63777.777778
std,7.693793,12265.579662
min,27.0,48000.0
25%,35.0,54000.0
50%,38.0,61000.0
75%,44.0,72000.0
max,50.0,83000.0


## Adding columns in the data frame Spark

- this method doesn't support inplace as pandas it return nes df 

In [30]:
df = df.withColumn('Age after 2 years', df['Age'] + 2)
df.show()

+-------+----+------+---------+-----------------+
|Country| Age|Salary|Purchased|Age after 2 years|
+-------+----+------+---------+-----------------+
| France|  44| 72000|       No|             46.0|
|  Spain|  27| 48000|      Yes|             29.0|
|Germany|  30| 54000|       No|             32.0|
|  Spain|  38| 61000|       No|             40.0|
|Germany|  40|  null|      Yes|             42.0|
| France|  35| 58000|      Yes|             37.0|
|  Spain|null| 52000|       No|             null|
| France|  48| 79000|      Yes|             50.0|
|Germany|  50| 83000|       No|             52.0|
| France|  37| 67000|      Yes|             39.0|
+-------+----+------+---------+-----------------+



## Droping columns

In [33]:
df = df.drop('Age after 2 years')

In [34]:
df.show()

+-------+----+------+---------+
|Country| Age|Salary|Purchased|
+-------+----+------+---------+
| France|  44| 72000|       No|
|  Spain|  27| 48000|      Yes|
|Germany|  30| 54000|       No|
|  Spain|  38| 61000|       No|
|Germany|  40|  null|      Yes|
| France|  35| 58000|      Yes|
|  Spain|null| 52000|       No|
| France|  48| 79000|      Yes|
|Germany|  50| 83000|       No|
| France|  37| 67000|      Yes|
+-------+----+------+---------+



### Rename Column

In [35]:
df.withColumnRenamed('Age', 'Old').show()

+-------+----+------+---------+
|Country| Old|Salary|Purchased|
+-------+----+------+---------+
| France|  44| 72000|       No|
|  Spain|  27| 48000|      Yes|
|Germany|  30| 54000|       No|
|  Spain|  38| 61000|       No|
|Germany|  40|  null|      Yes|
| France|  35| 58000|      Yes|
|  Spain|null| 52000|       No|
| France|  48| 79000|      Yes|
|Germany|  50| 83000|       No|
| France|  37| 67000|      Yes|
+-------+----+------+---------+



#                                                Missing Values

In [37]:
## drop the colum
df.drop('Country').show()

+----+------+---------+
| Age|Salary|Purchased|
+----+------+---------+
|  44| 72000|       No|
|  27| 48000|      Yes|
|  30| 54000|       No|
|  38| 61000|       No|
|  40|  null|      Yes|
|  35| 58000|      Yes|
|null| 52000|       No|
|  48| 79000|      Yes|
|  50| 83000|       No|
|  37| 67000|      Yes|
+----+------+---------+



- it will drop all row that has at least null value in colum

In [39]:
df.na.drop().show()

+-------+---+------+---------+
|Country|Age|Salary|Purchased|
+-------+---+------+---------+
| France| 44| 72000|       No|
|  Spain| 27| 48000|      Yes|
|Germany| 30| 54000|       No|
|  Spain| 38| 61000|       No|
| France| 35| 58000|      Yes|
| France| 48| 79000|      Yes|
|Germany| 50| 83000|       No|
| France| 37| 67000|      Yes|
+-------+---+------+---------+



In [46]:
df = spark.read.csv('test2.csv', header=True, inferSchema=True)
df.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



- how = all or any it mean if all columns in the rows has null drop the row,
- how = any if mean if just one value null in colmn drop the row

In [48]:
df.na.drop(how='any').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



<p> - thresh take int number, for exemple thresh=2 it mean if they are mini 2 null value in the row drop it if not keep it 

</p>

In [59]:
df.na.drop(thresh=3).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



- subset take list of colmns names to drop row that their colmns had null

In [61]:
df.na.drop(subset=['age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
|     null| 36|      null|  null|
+---------+---+----------+------+



## Fill the missing values

In [64]:
df.fillna(2).show()
df.na.fill('Missing').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  2|         2| 40000|
|     null| 34|        10| 38000|
|     null| 36|         2|     2|
+---------+---+----------+------+

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|  Missing|  34|        10| 38000|
|  Missing|  36|      null|  null|
+---------+----+----------+------+



- missing value with mean

In [66]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Experience', 'Salary', 'age'], outputCols=["{}_imputer".format(i) for i in ['Experience', 'Salary', 'age']])

In [67]:
imputer.fit(df).transform(df).show()

+---------+----+----------+------+------------------+--------------+-----------+
|     Name| age|Experience|Salary|Experience_imputer|Salary_imputer|age_imputer|
+---------+----+----------+------+------------------+--------------+-----------+
|    Krish|  31|        10| 30000|                10|         30000|         31|
|Sudhanshu|  30|         8| 25000|                 8|         25000|         30|
|    Sunny|  29|         4| 20000|                 4|         20000|         29|
|     Paul|  24|         3| 20000|                 3|         20000|         24|
|   Harsha|  21|         1| 15000|                 1|         15000|         21|
|  Shubham|  23|         2| 18000|                 2|         18000|         23|
|   Mahesh|null|      null| 40000|                 5|         40000|         28|
|     null|  34|        10| 38000|                10|         38000|         34|
|     null|  36|      null|  null|                 5|         25750|         36|
+---------+----+----------+-

In [68]:
data = imputer.fit(df).transform(df)
data.show()

+---------+----+----------+------+------------------+--------------+-----------+
|     Name| age|Experience|Salary|Experience_imputer|Salary_imputer|age_imputer|
+---------+----+----------+------+------------------+--------------+-----------+
|    Krish|  31|        10| 30000|                10|         30000|         31|
|Sudhanshu|  30|         8| 25000|                 8|         25000|         30|
|    Sunny|  29|         4| 20000|                 4|         20000|         29|
|     Paul|  24|         3| 20000|                 3|         20000|         24|
|   Harsha|  21|         1| 15000|                 1|         15000|         21|
|  Shubham|  23|         2| 18000|                 2|         18000|         23|
|   Mahesh|null|      null| 40000|                 5|         40000|         28|
|     null|  34|        10| 38000|                10|         38000|         34|
|     null|  36|      null|  null|                 5|         25750|         36|
+---------+----+----------+-

<h2><center> The filter operation</center></h2>

In [73]:
data.filter(data.age > 30).show()

+-----+---+----------+------+------------------+--------------+-----------+
| Name|age|Experience|Salary|Experience_imputer|Salary_imputer|age_imputer|
+-----+---+----------+------+------------------+--------------+-----------+
|Krish| 31|        10| 30000|                10|         30000|         31|
| null| 34|        10| 38000|                10|         38000|         34|
| null| 36|      null|  null|                 5|         25750|         36|
+-----+---+----------+------+------------------+--------------+-----------+



In [75]:
data.filter(data.age_imputer > 30).select(['Experience_imputer', 'Salary_imputer']).show()

+------------------+--------------+
|Experience_imputer|Salary_imputer|
+------------------+--------------+
|                10|         30000|
|                10|         38000|
|                 5|         25750|
+------------------+--------------+



<h2><center> The GroupBy operation</center></h2>

In [76]:
df = spark.read.csv('test3.csv', header=True, inferSchema=True)
df.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [81]:
df.groupby(['Departments']).max().show()

+------------+-----------+
| Departments|max(salary)|
+------------+-----------+
|         IOT|      10000|
|    Big Data|       5000|
|Data Science|      20000|
+------------+-----------+



In [82]:
df.groupBy(['name']).sum().show()

+---------+-----------+
|     name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



<h1><center> Spark MLlib </center></h1>

In [90]:
spark.stop()

In [4]:
spark2 = SparkSession.builder.appName(name = 'MlLib').getOrCreate()
spark2

In [5]:
training = spark2.read.csv('test1.csv',header=True,inferSchema=True)

In [6]:
training.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [7]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [8]:
training.columns

['Name', 'age', 'Experience', 'Salary']

- Making the features into vector for ML

In [30]:
from pyspark.ml.feature import VectorAssembler

In [31]:
featureassembler=VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")

In [32]:
output=featureassembler.transform(training)

In [33]:
output.show()

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [34]:
final_data = output.select("Independent Features", "salary")
final_data.show()

+--------------------+------+
|Independent Features|salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [35]:
from pyspark.ml.regression import LinearRegression

In [49]:
train_data, test_data = final_data.randomSplit([0.75, 0.25])
ln = LinearRegression(featuresCol="Independent Features", labelCol="salary")

In [50]:
ln = ln.fit(train_data)

In [51]:
ln.coefficients

DenseVector([-58.8235, 1666.6667])

In [52]:
ln.intercept

15196.078431372378

In [53]:
pred_results=ln.evaluate(test_data)

In [55]:
test_data.show()

+--------------------+------+
|Independent Features|salary|
+--------------------+------+
|          [24.0,3.0]| 20000|
|          [30.0,8.0]| 25000|
+--------------------+------+



In [58]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|salary|        prediction|
+--------------------+------+------------------+
|          [24.0,3.0]| 20000|18784.313725490192|
|          [30.0,8.0]| 25000|26764.705882352937|
+--------------------+------+------------------+



In [59]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(1490.1960784313724, 2296039.984621296)