In [3]:
import os
from pyspark.sql import SparkSession

# Set the Java home
os.environ['JAVA_HOME'] = r'C:\Program Files\OpenLogic\jdk-11.0.24.8-hotspot'

# Create Spark session
spark = SparkSession.builder \
    .appName('practice') \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql_2.12:3.3.1') \
    .getOrCreate()

#### when running in the local you have one master node

In [5]:
spark

In [8]:
file_path = r'C:\Users\Admin\Desktop\testdata.csv'
df_pyspark=spark.read.csv(file_path)

In [9]:
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string]

In [10]:
df_pyspark.show()

+---------+---+----------+
|      _c0|_c1|       _c2|
+---------+---+----------+
|     Name|Age|Experience|
|Sharwaree| 20|        10|
|  Sarthak| 17|         8|
|   Advika| 10|         4|
+---------+---+----------+



 here we have changed the default column values of c0 and c1 to actual vals name & age

In [12]:
df_pyspark=spark.read.option('header','true').csv(file_path)
df_pyspark.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|Sharwaree| 20|        10|
|  Sarthak| 17|         8|
|   Advika| 10|         4|
+---------+---+----------+



In [15]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [17]:
df_pyspark.head(4)

[Row(Name='Sharwaree', Age='20', Experience='10'),
 Row(Name='Sarthak', Age='17', Experience='8'),
 Row(Name='Advika', Age='10', Experience='4')]

In [21]:
#similar to df.info() 
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Experience: string (nullable = true)



In [23]:
#to remove the default option of string use inferschema, now the datatype of age is integer
df_pyspark=spark.read.option('header','true').csv(file_path,inferSchema=True)
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



alternate way of reading

In [26]:
df_pyspark=spark.read.csv(file_path,header=True,inferSchema=True)
df_pyspark.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|Sharwaree| 20|        10|
|  Sarthak| 17|         8|
|   Advika| 10|         4|
+---------+---+----------+



In [27]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [30]:
#column selection
df_pyspark.select('Name')

DataFrame[Name: string]

In [32]:
df_pyspark.select(['Name','Experience']).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|Sharwaree|        10|
|  Sarthak|         8|
|   Advika|         4|
+---------+----------+



In [34]:
df_pyspark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [36]:
df_pyspark.describe().show()

+-------+---------+------------------+-----------------+
|summary|     Name|               Age|       Experience|
+-------+---------+------------------+-----------------+
|  count|        3|                 3|                3|
|   mean|     NULL|15.666666666666666|7.333333333333333|
| stddev|     NULL| 5.131601439446884|3.055050463303893|
|    min|   Advika|                10|                4|
|    max|Sharwaree|                20|               10|
+-------+---------+------------------+-----------------+



In [37]:
#adding colms , not an inplace op 
df_pyspark=df_pyspark.withColumn('Experience after 2 yrs',df_pyspark['Experience']+2)
df_pyspark.show()

+---------+---+----------+----------------------+
|     Name|Age|Experience|Experience after 2 yrs|
+---------+---+----------+----------------------+
|Sharwaree| 20|        10|                    12|
|  Sarthak| 17|         8|                    10|
|   Advika| 10|         4|                     6|
+---------+---+----------+----------------------+



In [38]:
## dropping colmns
df_pyspark.drop('Experience after 2 yrs').show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|Sharwaree| 20|        10|
|  Sarthak| 17|         8|
|   Advika| 10|         4|
+---------+---+----------+



drop is not an inplace op so

In [43]:
df_pyspark=df_pyspark.drop('Experience after 2 yrs')

In [45]:
df_pyspark.show()

+---------+---+----------+
|     Name|Age|Experience|
+---------+---+----------+
|Sharwaree| 20|        10|
|  Sarthak| 17|         8|
|   Advika| 10|         4|
+---------+---+----------+



In [47]:
#Renaming the colm
df_pyspark.withColumnRenamed('Experience','Exp').show()

+---------+---+---+
|     Name|Age|Exp|
+---------+---+---+
|Sharwaree| 20| 10|
|  Sarthak| 17|  8|
|   Advika| 10|  4|
+---------+---+---+



In [49]:
filepath=r'C:\Users\Admin\Desktop\test2.csv'
df=spark.read.csv(filepath,header=True,inferSchema=True)
df.show()

+-----+----+------+
| Name| Age|Salary|
+-----+----+------+
| Sara|  20| 30000|
| Arya|  20| 60000|
|manas|  20|  NULL|
| NULL|NULL|  NULL|
| Ashi|NULL| 40000|
|Tashi|NULL|  NULL|
+-----+----+------+



In [51]:
#dropping null vals 
df.na.drop().show()

+----+---+------+
|Name|Age|Salary|
+----+---+------+
|Sara| 20| 30000|
|Arya| 20| 60000|
+----+---+------+



In [53]:
df.na.drop(how="any").show()

+----+---+------+
|Name|Age|Salary|
+----+---+------+
|Sara| 20| 30000|
|Arya| 20| 60000|
+----+---+------+



In [55]:
df.na.drop(how="all").show()

+-----+----+------+
| Name| Age|Salary|
+-----+----+------+
| Sara|  20| 30000|
| Arya|  20| 60000|
|manas|  20|  NULL|
| Ashi|NULL| 40000|
|Tashi|NULL|  NULL|
+-----+----+------+



In [57]:
#threshold if thresh=2 display all rows with atleast 2 non null vals 
df.na.drop(thresh=2).show()

+-----+----+------+
| Name| Age|Salary|
+-----+----+------+
| Sara|  20| 30000|
| Arya|  20| 60000|
|manas|  20|  NULL|
| Ashi|NULL| 40000|
+-----+----+------+



In [59]:

#subset -delete null vals only from salary colm
df.na.drop(how="any",subset=['Salary']).show()

+----+----+------+
|Name| Age|Salary|
+----+----+------+
|Sara|  20| 30000|
|Arya|  20| 60000|
|Ashi|NULL| 40000|
+----+----+------+



In [61]:
# fill (val,subset=None)
#string vals
df=df.na.fill('missing vals')
df.show()

+------------+----+------+
|        Name| Age|Salary|
+------------+----+------+
|        Sara|  20| 30000|
|        Arya|  20| 60000|
|       manas|  20|  NULL|
|missing vals|NULL|  NULL|
|        Ashi|NULL| 40000|
|       Tashi|NULL|  NULL|
+------------+----+------+



#for integer
#here only age col will be replaced with 0 salary will still have null vals
df=df.na.fill(0,subset=['Age'])
df.show()

In [64]:
# replacing with mean
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Age','Salary'],
    outputCols=["{}_imputed".format(c) for c in ['Age','Salary']]).setStrategy("mean")

In [66]:
imputer.fit(df).transform(df).show()

+------------+----+------+-----------+--------------+
|        Name| Age|Salary|Age_imputed|Salary_imputed|
+------------+----+------+-----------+--------------+
|        Sara|  20| 30000|         20|         30000|
|        Arya|  20| 60000|         20|         60000|
|       manas|  20|  NULL|         20|         43333|
|missing vals|NULL|  NULL|         20|         43333|
|        Ashi|NULL| 40000|         20|         40000|
|       Tashi|NULL|  NULL|         20|         43333|
+------------+----+------+-----------+--------------+



#### filter operations

In [69]:
df.filter("Salary<=40000").show()

+----+----+------+
|Name| Age|Salary|
+----+----+------+
|Sara|  20| 30000|
|Ashi|NULL| 40000|
+----+----+------+



In [71]:
df.filter("Salary>50000").select(['Name']).show() #select can take multiple cols 

+----+
|Name|
+----+
|Arya|
+----+



In [73]:
#multiple filters using and,or ..
df.filter((df['Salary']>30000) &
          (df['Salary']<60000)).show()

+----+----+------+
|Name| Age|Salary|
+----+----+------+
|Ashi|NULL| 40000|
+----+----+------+



In [75]:
df.filter(~((df['Salary']>30000) & (df['Salary']<60000))).show() #salary greater than 30000 and less than 60000 is 40000 so
#implying not,it gives sara,arya

+----+---+------+
|Name|Age|Salary|
+----+---+------+
|Sara| 20| 30000|
|Arya| 20| 60000|
+----+---+------+



###### GroupBy and Aggregate Fucns(Pre-processing)

In [78]:
file=r'C:\Users\Admin\Desktop\test3.csv'
df_o=spark.read.csv(file,header=True,inferSchema=True)
df_o.show()

+-----+----------+-------+
| Name|Department| Salary|
+-----+----------+-------+
| Kaya|  Data Sci|1000000|
| Kaya|       IOT| 100000|
|Arnav|  Data Sci| 850000|
|Kunal|       IOT| 700000|
|Kunal|      ENTC| 600000|
|Arnav| Big Data | 950000|
|Arnav|       IOT| 750000|
| Kaya| Big Data | 900000|
+-----+----------+-------+



In [80]:
#group to find max salary
df_o.groupBy('Name').sum().show()

+-----+-----------+
| Name|sum(Salary)|
+-----+-----------+
|Kunal|    1300000|
|Arnav|    2550000|
| Kaya|    2000000|
+-----+-----------+



In [82]:
df_o.groupBy('Name').max().show()  #kunal's highest salary is 700000 , kaya's highest sal is 1000000

+-----+-----------+
| Name|max(Salary)|
+-----+-----------+
|Kunal|     700000|
|Arnav|     950000|
| Kaya|    1000000|
+-----+-----------+



In [84]:
#which department gives max salary
df_o.groupBy('Department').sum().show()

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|       IOT|    1550000|
| Big Data |    1850000|
|  Data Sci|    1850000|
|      ENTC|     600000|
+----------+-----------+



In [86]:
df_o.groupBy('Department').mean().show()

+----------+-----------------+
|Department|      avg(Salary)|
+----------+-----------------+
|       IOT|516666.6666666667|
| Big Data |         925000.0|
|  Data Sci|         925000.0|
|      ENTC|         600000.0|
+----------+-----------------+



In [88]:
#num of people in each department
df_o.groupBy('Department').count().show()

+----------+-----+
|Department|count|
+----------+-----+
|       IOT|    3|
| Big Data |    2|
|  Data Sci|    2|
|      ENTC|    1|
+----------+-----+



In [90]:
df_o.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|    5850000|
+-----------+



#### ML

In [93]:
path=r'C:\Users\Admin\Desktop\test4.csv'
dfr=spark.read.csv(path,header=True,inferSchema=True)
dfr.show()

+--------+---+----------+------+
|    Name|Age|Experience|Salary|
+--------+---+----------+------+
|   Aryan| 31|        10| 30000|
|   Dhruv| 30|         8| 25000|
|   Ishan| 29|         4| 20000|
| Natasha| 24|         3| 20000|
|    Ojas| 21|         1| 15000|
|Shivanya| 23|         2| 18000|
+--------+---+----------+------+



In [95]:
dfr.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [97]:
dfr.columns #list of cols . columns is not a func/method

['Name', 'Age', 'Experience', 'Salary']

In [103]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["Age","Experience"],outputCol="Independent features")
                                    

In [105]:
#take the DataFrame dfr and apply the VectorAssembler transformation
output=featureassembler.transform(dfr)
output.show()

+--------+---+----------+------+--------------------+
|    Name|Age|Experience|Salary|Independent features|
+--------+---+----------+------+--------------------+
|   Aryan| 31|        10| 30000|         [31.0,10.0]|
|   Dhruv| 30|         8| 25000|          [30.0,8.0]|
|   Ishan| 29|         4| 20000|          [29.0,4.0]|
| Natasha| 24|         3| 20000|          [24.0,3.0]|
|    Ojas| 21|         1| 15000|          [21.0,1.0]|
|Shivanya| 23|         2| 18000|          [23.0,2.0]|
+--------+---+----------+------+--------------------+



In [115]:
output.columns

['Name', 'Age', 'Experience', 'Salary', 'Independent features']

In [117]:
fi_data=output.select("Independent features","Salary")
fi_data.show()

+--------------------+------+
|Independent features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [121]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=fi_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent features',labelCol='Salary')
regressor=regressor.fit(train_data)

In [123]:
regressor.coefficients

DenseVector([-383.9733, 1711.1853])

In [125]:
regressor.intercept

23998.330550919385

In [127]:
predict=regressor.evaluate(test_data)

In [129]:
predict.predictions.show()

+--------------------+------+------------------+
|Independent features|Salary|        prediction|
+--------------------+------+------------------+
|          [21.0,1.0]| 15000| 17646.07679465793|
|          [23.0,2.0]| 18000|18589.315525876566|
+--------------------+------+------------------+

