In [13]:
!pip install pyspark



In [1]:
import pyspark

# For Starting a New Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [2]:
spark

# PySpark DataFrame
# Reading the Dataset
# Checking the DataType of the Column
# Selecting Columns and Indexing
# Check Describe Option Similar to Pandas 
# Adding, Dropping and Renaming Columns

In [40]:
## Reading the data
df_Spark = spark.read.csv(r"H:\Mohan\Downloads\Top5Category.csv", header=True , inferSchema=True) # By default spark read all column as string datatype but by adding inferschema = True it can get the actual datatype
df_Spark.show()

+---+-----+--------------+
|_c0|Score|      Category|
+---+-----+--------------+
| 15|53935|        travel|
| 10|53657|       science|
|  8|52745|healthy eating|
|  1|52443|       animals|
|  2|49681|       cooking|
+---+-----+--------------+



In [41]:
# TO Find the Datatype of the dataframe similar to pandas (info)
df_Spark.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Score: integer (nullable = true)
 |-- Category: string (nullable = true)



In [42]:
# How to select a single column
specific_column = df_Spark.select(["Category","Score"])
specific_column.show()

+--------------+-----+
|      Category|Score|
+--------------+-----+
|        travel|53935|
|       science|53657|
|healthy eating|52745|
|       animals|52443|
|       cooking|49681|
+--------------+-----+



In [43]:
# Describe Options
specific_column.describe().show()

+-------+--------+------------------+
|summary|Category|             Score|
+-------+--------+------------------+
|  count|       5|                 5|
|   mean|    null|           52492.2|
| stddev|    null|1688.7572945808402|
|    min| animals|             49681|
|    max|  travel|             53935|
+-------+--------+------------------+



In [44]:
# Adding column in PySpark DataFrame

df_Spark=df_Spark.withColumn("Score With Tax",df_Spark["Score"]+1000)
df_Spark.show()

+---+-----+--------------+--------------+
|_c0|Score|      Category|Score With Tax|
+---+-----+--------------+--------------+
| 15|53935|        travel|         54935|
| 10|53657|       science|         54657|
|  8|52745|healthy eating|         53745|
|  1|52443|       animals|         53443|
|  2|49681|       cooking|         50681|
+---+-----+--------------+--------------+



In [45]:
# Dropping the Column in PySpark DataFrame
df_Spark = df_Spark.drop("Score With Tax")
df_Spark.show()

+---+-----+--------------+
|_c0|Score|      Category|
+---+-----+--------------+
| 15|53935|        travel|
| 10|53657|       science|
|  8|52745|healthy eating|
|  1|52443|       animals|
|  2|49681|       cooking|
+---+-----+--------------+



In [46]:
# Renaming the Columns
df_Spark = df_Spark.withColumnRenamed('_c0','Rank')
df_Spark.show()

+----+-----+--------------+
|Rank|Score|      Category|
+----+-----+--------------+
|  15|53935|        travel|
|  10|53657|       science|
|   8|52745|healthy eating|
|   1|52443|       animals|
|   2|49681|       cooking|
+----+-----+--------------+



# Dropping Rows
# Various Parameter in Dropping Function
# Handling Missing Values by Means, Median and Mode

In [47]:
## Reading the data
df_Spark_1 = spark.read.csv(r"C:\Users\MOHANRAJ\Desktop\Data streaming analysis\StreamData.csv", header=True , inferSchema=True) # By default spark read all column as string datatype but by adding inferschema = True it can get the actual datatype
df_Spark_1.show()

+---+------+----+------------------+--------------+------+
| Id|  Name| Age|        Profession|          City|Salary|
+---+------+----+------------------+--------------+------+
|100| Kumar|  27|            Lawyer|     Riverside|  1558|
|101|  null|  39|          Musician|        Maleee|  null|
|102|Kurnal|  48|            Police|  Bahia blanca|  7612|
|103|Monesh|null|           Teacher|          null|  2451|
|104|   Raj|  51|Software Developer|     Amristsar|  3599|
|105| Mohan|  36|            Doctor|      Montreal|  null|
|106| Anish|  55|            Police|Sanfrancesisco|  3578|
|107|  null|  36|            Police|          Gaza|  2268|
|108| Priya|null|           Student|       Hamburg|  3971|
|109| Ramya|  24|          Designer|          null|  9964|
+---+------+----+------------------+--------------+------+



In [48]:
df_Spark_1.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Profession: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [49]:
# Dropping Null Values
df_Spark_1.na.drop().show()

+---+------+---+------------------+--------------+------+
| Id|  Name|Age|        Profession|          City|Salary|
+---+------+---+------------------+--------------+------+
|100| Kumar| 27|            Lawyer|     Riverside|  1558|
|102|Kurnal| 48|            Police|  Bahia blanca|  7612|
|104|   Raj| 51|Software Developer|     Amristsar|  3599|
|106| Anish| 55|            Police|Sanfrancesisco|  3578|
+---+------+---+------------------+--------------+------+



In [50]:
# Dropping ParaMeter how , thresh and subset
df_Spark_1.na.drop(how = "any", thresh=5).show()

+---+------+----+------------------+--------------+------+
| Id|  Name| Age|        Profession|          City|Salary|
+---+------+----+------------------+--------------+------+
|100| Kumar|  27|            Lawyer|     Riverside|  1558|
|102|Kurnal|  48|            Police|  Bahia blanca|  7612|
|104|   Raj|  51|Software Developer|     Amristsar|  3599|
|105| Mohan|  36|            Doctor|      Montreal|  null|
|106| Anish|  55|            Police|Sanfrancesisco|  3578|
|107|  null|  36|            Police|          Gaza|  2268|
|108| Priya|null|           Student|       Hamburg|  3971|
|109| Ramya|  24|          Designer|          null|  9964|
+---+------+----+------------------+--------------+------+



In [51]:
# Dropping null values in a particular column we can use SubSet parameter 
df_Spark_1.na.drop(how = "any", subset=["Salary"]).show()

+---+------+----+------------------+--------------+------+
| Id|  Name| Age|        Profession|          City|Salary|
+---+------+----+------------------+--------------+------+
|100| Kumar|  27|            Lawyer|     Riverside|  1558|
|102|Kurnal|  48|            Police|  Bahia blanca|  7612|
|103|Monesh|null|           Teacher|          null|  2451|
|104|   Raj|  51|Software Developer|     Amristsar|  3599|
|106| Anish|  55|            Police|Sanfrancesisco|  3578|
|107|  null|  36|            Police|          Gaza|  2268|
|108| Priya|null|           Student|       Hamburg|  3971|
|109| Ramya|  24|          Designer|          null|  9964|
+---+------+----+------------------+--------------+------+



In [52]:
# Filling Null value
df_Spark_1.na.fill("NAN",['Name','City']).show()

+---+------+----+------------------+--------------+------+
| Id|  Name| Age|        Profession|          City|Salary|
+---+------+----+------------------+--------------+------+
|100| Kumar|  27|            Lawyer|     Riverside|  1558|
|101|   NAN|  39|          Musician|        Maleee|  null|
|102|Kurnal|  48|            Police|  Bahia blanca|  7612|
|103|Monesh|null|           Teacher|           NAN|  2451|
|104|   Raj|  51|Software Developer|     Amristsar|  3599|
|105| Mohan|  36|            Doctor|      Montreal|  null|
|106| Anish|  55|            Police|Sanfrancesisco|  3578|
|107|   NAN|  36|            Police|          Gaza|  2268|
|108| Priya|null|           Student|       Hamburg|  3971|
|109| Ramya|  24|          Designer|           NAN|  9964|
+---+------+----+------------------+--------------+------+



In [53]:
# Importing Imputer for filling na values with mean median and mode

from pyspark.ml.feature import Imputer

imputer = Imputer(
                    inputCols = ['Age','Salary'],
                    outputCols = [c for c in ['Age','Salary']]
                    ).setStrategy("mean") # median and Mode

# Fiting and Transform imputer in df_spark_1
df_Spark_1 = imputer.fit(df_Spark_1).transform(df_Spark_1)
df_Spark_1.show()

+---+------+---+------------------+--------------+------+
| Id|  Name|Age|        Profession|          City|Salary|
+---+------+---+------------------+--------------+------+
|100| Kumar| 27|            Lawyer|     Riverside|  1558|
|101|  null| 39|          Musician|        Maleee|  4375|
|102|Kurnal| 48|            Police|  Bahia blanca|  7612|
|103|Monesh| 39|           Teacher|          null|  2451|
|104|   Raj| 51|Software Developer|     Amristsar|  3599|
|105| Mohan| 36|            Doctor|      Montreal|  4375|
|106| Anish| 55|            Police|Sanfrancesisco|  3578|
|107|  null| 36|            Police|          Gaza|  2268|
|108| Priya| 39|           Student|       Hamburg|  3971|
|109| Ramya| 24|          Designer|          null|  9964|
+---+------+---+------------------+--------------+------+



# Filter Operation
# Operators
# ~ inverse filter

In [54]:
df_spark_2  = spark.read.csv(r"C:\Users\MOHANRAJ\Desktop\Data streaming analysis\StreamData.csv",header =True, inferSchema= True)
df_spark_2.show()

+---+------+---+------------------+--------------+------+
| Id|  Name|Age|        Profession|          City|Salary|
+---+------+---+------------------+--------------+------+
|100| Kumar| 27|            Lawyer|     Riverside|  1558|
|101| Priya| 39|          Musician|        Maleee|  2268|
|102|Kurnal| 48|            Police|  Bahia blanca|  7612|
|103|Monesh| 36|           Teacher|     Riverside|  2451|
|104|   Raj| 51|Software Developer|     Amristsar|  3599|
|105| Mohan| 36|            Doctor|      Montreal|  2268|
|106| Anish| 55|            Police|Sanfrancesisco|  3578|
|107| Priya| 36|            Police|          Gaza|  2268|
|108| Priya| 36|           Student|       Hamburg|  3971|
|109| Ramya| 24|          Designer|     Riverside|  9964|
+---+------+---+------------------+--------------+------+



In [56]:
# Filter Operation

# Salary greater than 5000
df_spark_2.filter("Salary>5000").show()

+---+------+---+----------+------------+------+
| Id|  Name|Age|Profession|        City|Salary|
+---+------+---+----------+------------+------+
|102|Kurnal| 48|    Police|Bahia blanca|  7612|
|109| Ramya| 24|  Designer|   Riverside|  9964|
+---+------+---+----------+------------+------+



In [58]:
df_spark_2.filter("Salary>5000").select(['Name','Profession','Salary']).show()

+------+----------+------+
|  Name|Profession|Salary|
+------+----------+------+
|Kurnal|    Police|  7612|
| Ramya|  Designer|  9964|
+------+----------+------+



# Groupby and Aggregate

In [59]:
df_spark_3  = spark.read.csv(r"C:\Users\MOHANRAJ\Desktop\Data streaming analysis\StreamData.csv",header =True, inferSchema= True)
df_spark_3.show()

+---+------+---+------------+---------+------+
| Id|  Name|Age|  Profession|     City|Salary|
+---+------+---+------------+---------+------+
|100| Kumar| 27|Data Science|  Chennai|  1558|
|101| Priya| 39|Data Analyst|Bangalore|  2268|
|102|Kurnal| 48|Data Analyst|Bangalore|  7612|
|103|Monesh| 36|Bi Developer|  Chennai|  2451|
|104|   Raj| 51|Data Science|Bangalore|  3599|
|105| Mohan| 36|Bi Developer|  Chennai|  2268|
|106| Anish| 55|Bi Developer|Bangalore|  3578|
|107| Priya| 36|Data Analyst|    Delhi|  2268|
|108| Priya| 36|Data Science|    Delhi|  3971|
|109| Ramya| 24|Data Analyst|  Chennai|  9964|
+---+------+---+------------+---------+------+



In [68]:
# Groupby
df_spark_3.groupBy("Profession").sum().show()

+------------+-------+--------+-----------+
|  Profession|sum(Id)|sum(Age)|sum(Salary)|
+------------+-------+--------+-----------+
|Bi Developer|    314|     127|       8297|
|Data Analyst|    419|     147|      22112|
|Data Science|    312|     114|       9128|
+------------+-------+--------+-----------+



In [70]:
df_spark_3.groupBy("City").mean().show()

+---------+-------+--------+-----------+
|     City|avg(Id)|avg(Age)|avg(Salary)|
+---------+-------+--------+-----------+
|Bangalore| 103.25|   48.25|    4264.25|
|  Chennai| 104.25|   30.75|    4060.25|
|    Delhi|  107.5|    36.0|     3119.5|
+---------+-------+--------+-----------+



In [71]:
df_spark_3.groupBy("Profession").count().show()

+------------+-----+
|  Profession|count|
+------------+-----+
|Bi Developer|    3|
|Data Analyst|    4|
|Data Science|    3|
+------------+-----+



In [75]:
df_spark_3.groupBy("Profession").max().show()

+------------+-------+--------+-----------+
|  Profession|max(Id)|max(Age)|max(Salary)|
+------------+-------+--------+-----------+
|Bi Developer|    106|      55|       3578|
|Data Analyst|    109|      48|       9964|
|Data Science|    108|      51|       3971|
+------------+-------+--------+-----------+



In [72]:
# Aggregate Function

df_spark_3.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      39537|
+-----------+




# Machine Learning Library

In [76]:
df_spark_4  = spark.read.csv(r"C:\Users\MOHANRAJ\Desktop\Data streaming analysis\StreamData.csv",header =True, inferSchema= True)
df_spark_4.show()

+---+------+----------+---+------------+---------+------+
| Id|  Name|Experiance|Age|  Profession|     City|Salary|
+---+------+----------+---+------------+---------+------+
|100| Kumar|         5| 27|Data Science|  Chennai|  2000|
|101| Priya|         7| 39|Data Analyst|Bangalore|  4000|
|102|Kurnal|         2| 48|Data Analyst|Bangalore|  6000|
|103|Monesh|         3| 36|Bi Developer|  Chennai|  8000|
|104|   Raj|         5| 51|Data Science|Bangalore| 10000|
|105| Mohan|        10| 36|Bi Developer|  Chennai| 12000|
|106| Anish|         6| 55|Bi Developer|Bangalore| 14000|
|107| Priya|         6| 36|Data Analyst|    Delhi| 16000|
|108| Priya|         2| 36|Data Science|    Delhi| 18000|
|109| Ramya|         1| 24|Data Analyst|  Chennai| 20000|
+---+------+----------+---+------------+---------+------+



In [77]:
df_spark_4.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Experiance: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Profession: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [79]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['Experiance','Age'],outputCol='Independent feature')

output = featureassembler.transform(df_spark_4)
output.show()

+---+------+----------+---+------------+---------+------+-------------------+
| Id|  Name|Experiance|Age|  Profession|     City|Salary|Independent feature|
+---+------+----------+---+------------+---------+------+-------------------+
|100| Kumar|         5| 27|Data Science|  Chennai|  2000|         [5.0,27.0]|
|101| Priya|         7| 39|Data Analyst|Bangalore|  4000|         [7.0,39.0]|
|102|Kurnal|         2| 48|Data Analyst|Bangalore|  6000|         [2.0,48.0]|
|103|Monesh|         3| 36|Bi Developer|  Chennai|  8000|         [3.0,36.0]|
|104|   Raj|         5| 51|Data Science|Bangalore| 10000|         [5.0,51.0]|
|105| Mohan|        10| 36|Bi Developer|  Chennai| 12000|        [10.0,36.0]|
|106| Anish|         6| 55|Bi Developer|Bangalore| 14000|         [6.0,55.0]|
|107| Priya|         6| 36|Data Analyst|    Delhi| 16000|         [6.0,36.0]|
|108| Priya|         2| 36|Data Science|    Delhi| 18000|         [2.0,36.0]|
|109| Ramya|         1| 24|Data Analyst|  Chennai| 20000|       

In [82]:
Final_Data=output.select('Independent feature','Salary')
Final_Data.show()

+-------------------+------+
|Independent feature|Salary|
+-------------------+------+
|         [5.0,27.0]|  2000|
|         [7.0,39.0]|  4000|
|         [2.0,48.0]|  6000|
|         [3.0,36.0]|  8000|
|         [5.0,51.0]| 10000|
|        [10.0,36.0]| 12000|
|         [6.0,55.0]| 14000|
|         [6.0,36.0]| 16000|
|         [2.0,36.0]| 18000|
|         [1.0,24.0]| 20000|
+-------------------+------+



In [83]:
# Splitting Data into train and test data
train_data,test_data = Final_Data.randomSplit([0.70,0.30])

# Linear Regression
from pyspark.ml.regression import LinearRegression
Model = LinearRegression(featuresCol="Independent feature",labelCol='Salary')
Model = Model.fit(train_data)

In [84]:
# Co efficient
Model.coefficients

DenseVector([117.6471, -588.2353])

In [86]:
# Intercepts
Model.intercept

33058.823529411784

In [87]:
# Prediction

prediction = Model.evaluate(test_data)
prediction.predictions.show()

+-------------------+------+------------------+
|Independent feature|Salary|        prediction|
+-------------------+------+------------------+
|         [2.0,36.0]| 18000|12117.647058823535|
|         [5.0,27.0]|  2000|17764.705882352944|
|         [5.0,51.0]| 10000| 3647.058823529409|
|         [6.0,55.0]| 14000|1411.7647058823495|
|         [7.0,39.0]|  4000|10941.176470588234|
+-------------------+------+------------------+

