In [68]:
import pyspark
import pandas as pd
import warnings
with warnings.catch_warnings():
    warnings.simplefilter("ignore")

In [69]:
type(pd.read_csv('test1.csv'))

pandas.core.frame.DataFrame

In [70]:
from pyspark.sql import SparkSession

In [71]:
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [72]:
spark

In [73]:
df_pyspark=spark.read.csv('test1.csv')

In [74]:
df_pyspark=spark.read.option('header','true').csv('test1.csv')

In [75]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [76]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



### Reading the dataset

In [77]:
df_pyspark=spark.read.option('header','true').csv('test1.csv',inferSchema=True)

### Checking the schema

In [78]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [79]:
df_pyspark=spark.read.csv('test1.csv',header=True,inferSchema=True)
df_pyspark.show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [80]:
### Check the schema
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [81]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [82]:
df_pyspark.head(3)

[Row(Name='Adit', age=31, Experience=10, Salary=30000),
 Row(Name='Paul', age=30, Experience=8, Salary=25000),
 Row(Name='Alan', age=29, Experience=4, Salary=20000)]

In [83]:
df_pyspark.show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [84]:
df_pyspark.select(['Name','Experience']).show()

+------+----------+
|  Name|Experience|
+------+----------+
|  Adit|        10|
|  Paul|         8|
|  Alan|         4|
|Shakib|         3|
|Zubair|         1|
|Porter|         2|
+------+----------+



In [85]:
df_pyspark['Name']

Column<'Name'>

In [86]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [87]:
df_pyspark.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  NULL|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  NULL| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|  Adit|                21|                1|             15000|
|    max|Zubair|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



### Adding Columns in data frame

In [88]:
df_pyspark=df_pyspark.withColumn('Experience After 2 year',df_pyspark['Experience']+2)

In [89]:
df_pyspark.show()

+------+---+----------+------+-----------------------+
|  Name|age|Experience|Salary|Experience After 2 year|
+------+---+----------+------+-----------------------+
|  Adit| 31|        10| 30000|                     12|
|  Paul| 30|         8| 25000|                     10|
|  Alan| 29|         4| 20000|                      6|
|Shakib| 24|         3| 20000|                      5|
|Zubair| 21|         1| 15000|                      3|
|Porter| 23|         2| 18000|                      4|
+------+---+----------+------+-----------------------+



### Dropping the columns

In [90]:
df_pyspark=df_pyspark.drop('Experience After 2 year')

In [91]:
df_pyspark.show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



### Rename the columns

In [92]:
df_pyspark.withColumnRenamed('Name','New Name').show()

+--------+---+----------+------+
|New Name|age|Experience|Salary|
+--------+---+----------+------+
|    Adit| 31|        10| 30000|
|    Paul| 30|         8| 25000|
|    Alan| 29|         4| 20000|
|  Shakib| 24|         3| 20000|
|  Zubair| 21|         1| 15000|
|  Porter| 23|         2| 18000|
+--------+---+----------+------+



In [93]:
df_pyspark=spark.read.csv('test2.csv',header=True,inferSchema=True)

In [94]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [95]:
df_pyspark.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
|  Adit|  31|        10| 30000|
|  Paul|  30|         8| 25000|
|  Alan|  29|         4| 20000|
|Shakib|  24|         3| 20000|
|Zubair|  21|         1| 15000|
|Porter|  23|         2| 18000|
| David|NULL|      NULL| 40000|
|  NULL|  34|        10| 38000|
|  NULL|  36|      NULL|  NULL|
+------+----+----------+------+



### Dropping the columns

In [96]:
df_pyspark.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|NULL|      NULL| 40000|
|  34|        10| 38000|
|  36|      NULL|  NULL|
+----+----------+------+



In [97]:
df_pyspark.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
|  Adit|  31|        10| 30000|
|  Paul|  30|         8| 25000|
|  Alan|  29|         4| 20000|
|Shakib|  24|         3| 20000|
|Zubair|  21|         1| 15000|
|Porter|  23|         2| 18000|
| David|NULL|      NULL| 40000|
|  NULL|  34|        10| 38000|
|  NULL|  36|      NULL|  NULL|
+------+----+----------+------+



In [98]:
df_pyspark.na.drop().show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [99]:
df_pyspark.na.drop(how="any").show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



### Setting threshold

In [100]:
df_pyspark.na.drop(how="any",thresh=3).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
|  NULL| 34|        10| 38000|
+------+---+----------+------+



### Subset

In [101]:
df_pyspark.na.drop(how="any",subset=['Age']).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
|  NULL| 34|        10| 38000|
|  NULL| 36|      NULL|  NULL|
+------+---+----------+------+



### Filling the Missing Value

In [102]:
df_pyspark.na.fill('Missing Values',['Experience','age']).show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
|  Adit|  31|        10| 30000|
|  Paul|  30|         8| 25000|
|  Alan|  29|         4| 20000|
|Shakib|  24|         3| 20000|
|Zubair|  21|         1| 15000|
|Porter|  23|         2| 18000|
| David|NULL|      NULL| 40000|
|  NULL|  34|        10| 38000|
|  NULL|  36|      NULL|  NULL|
+------+----+----------+------+



In [103]:
df_pyspark.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
|  Adit|  31|        10| 30000|
|  Paul|  30|         8| 25000|
|  Alan|  29|         4| 20000|
|Shakib|  24|         3| 20000|
|Zubair|  21|         1| 15000|
|Porter|  23|         2| 18000|
| David|NULL|      NULL| 40000|
|  NULL|  34|        10| 38000|
|  NULL|  36|      NULL|  NULL|
+------+----+----------+------+



In [104]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

# Adding imputation columns to the DataFrame


In [105]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+------+----+----------+------+-----------+------------------+--------------+
|  Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+------+----+----------+------+-----------+------------------+--------------+
|  Adit|  31|        10| 30000|         31|                10|         30000|
|  Paul|  30|         8| 25000|         30|                 8|         25000|
|  Alan|  29|         4| 20000|         29|                 4|         20000|
|Shakib|  24|         3| 20000|         24|                 3|         20000|
|Zubair|  21|         1| 15000|         21|                 1|         15000|
|Porter|  23|         2| 18000|         23|                 2|         18000|
| David|NULL|      NULL| 40000|         29|                 4|         40000|
|  NULL|  34|        10| 38000|         34|                10|         38000|
|  NULL|  36|      NULL|  NULL|         36|                 4|         20000|
+------+----+----------+------+-----------+------------------+--

In [106]:
df_pyspark=spark.read.csv('test1.csv',header=True,inferSchema=True)
df_pyspark.show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [107]:
### Salary of the people less than or equal to 20000
df_pyspark.filter("Salary<=20000").show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [108]:
df_pyspark.filter("Salary<=20000").select(['Name','age']).show()

+------+---+
|  Name|age|
+------+---+
|  Alan| 29|
|Shakib| 24|
|Zubair| 21|
|Porter| 23|
+------+---+



In [109]:
df_pyspark.filter(df_pyspark['Salary']<=20000).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [110]:
df_pyspark.filter((df_pyspark['Salary']<=20000) | 
                  (df_pyspark['Salary']>=15000)).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [111]:
df_pyspark.filter(~(df_pyspark['Salary']<=20000)).show()

+----+---+----------+------+
|Name|age|Experience|Salary|
+----+---+----------+------+
|Adit| 31|        10| 30000|
|Paul| 30|         8| 25000|
+----+---+----------+------+



In [112]:
df_pyspark=spark.read.csv('test3.csv',header=True,inferSchema=True)
df_pyspark.show()

+-----+------------+------+
| Name| Departments|salary|
+-----+------------+------+
| Adit|Data Science| 10000|
| Adit|         IOT|  5000|
|David|    Big Data|  4000|
| Adit|    Big Data|  4000|
|David|Data Science|  3000|
| Alan|Data Science| 20000|
| Alan|         IOT| 10000|
| Alan|    Big Data|  5000|
|  Bob|Data Science| 10000|
|  Bob|    Big Data|  2000|
+-----+------------+------+



In [113]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



### Groupby


In [114]:
### Grouped to find the maximum salary
df_pyspark.groupBy('Name').sum().show()

+-----+-----------+
| Name|sum(salary)|
+-----+-----------+
|  Bob|      12000|
| Alan|      35000|
| Adit|      19000|
|David|       7000|
+-----+-----------+



In [115]:
df_pyspark.groupBy('Name').avg().show()

+-----+------------------+
| Name|       avg(salary)|
+-----+------------------+
|  Bob|            6000.0|
| Alan|11666.666666666666|
| Adit| 6333.333333333333|
|David|            3500.0|
+-----+------------------+



In [116]:
### Groupby Departmernts  which gives maximum salary
df_pyspark.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [117]:
df_pyspark.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [118]:
df_pyspark.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [119]:
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+



In [120]:
training = spark.read.csv('test1.csv',header=True,inferSchema=True)
training.show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|  Adit| 31|        10| 30000|
|  Paul| 30|         8| 25000|
|  Alan| 29|         4| 20000|
|Shakib| 24|         3| 20000|
|Zubair| 21|         1| 15000|
|Porter| 23|         2| 18000|
+------+---+----------+------+



In [121]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [122]:
training.columns

['Name', 'age', 'Experience', 'Salary']

In [123]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")

In [124]:
output=featureassembler.transform(training)

In [125]:
output.show()

+------+---+----------+------+--------------------+
|  Name|age|Experience|Salary|Independent Features|
+------+---+----------+------+--------------------+
|  Adit| 31|        10| 30000|         [31.0,10.0]|
|  Paul| 30|         8| 25000|          [30.0,8.0]|
|  Alan| 29|         4| 20000|          [29.0,4.0]|
|Shakib| 24|         3| 20000|          [24.0,3.0]|
|Zubair| 21|         1| 15000|          [21.0,1.0]|
|Porter| 23|         2| 18000|          [23.0,2.0]|
+------+---+----------+------+--------------------+



In [126]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Independent Features']

In [127]:
finalized_data=output.select("Independent Features","Salary")

In [128]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [129]:
from pyspark.ml.regression import LinearRegression
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor=regressor.fit(train_data)

23/11/09 13:42:32 WARN Instrumentation: [f67bd394] regParam is zero, which might cause numerical instability and overfitting.


### Coefficients

In [130]:
regressor.coefficients

DenseVector([-263.7076, 1767.624])

### Intercepts

In [131]:
regressor.intercept

19919.060052212404

### Prediction

In [132]:
pred_results=regressor.evaluate(test_data)

In [133]:
pred_results.predictions.show()

+--------------------+------+-----------------+
|Independent Features|Salary|       prediction|
+--------------------+------+-----------------+
|          [29.0,4.0]| 20000|19342.03655352618|
+--------------------+------+-----------------+



In [134]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(657.9634464738192, 432915.89689570636)