#### Starting a Spark Session

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('test check').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/10 15:37:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

#### READ A DATASET

In [4]:
# read the dataset
df_spark = spark.read.csv('test1.csv',header=True,inferSchema=True)

In [5]:
df_spark.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
|Apoorv|  28|         8| 30000|
| Krish|  35|        10| 23000|
| Akash|  32|        11| 34000|
|  Vasu|  22|        12| 44000|
|  Ashu|NULL|      NULL| 55000|
|Saloni|  26|         6| 23000|
|  NULL|  36|        11| 45400|
+------+----+----------+------+



In [6]:
# check schema

df_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



#### COLUMN OPTIONS

In [7]:
# selecting a column

df_spark.select('Name','Experience','age').show()

+------+----------+----+
|  Name|Experience| age|
+------+----------+----+
|Apoorv|         8|  28|
| Krish|        10|  35|
| Akash|        11|  32|
|  Vasu|        12|  22|
|  Ashu|      NULL|NULL|
|Saloni|         6|  26|
|  NULL|        11|  36|
+------+----------+----+



In [8]:
df_spark.dtypes       # same as pandas

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [9]:
df_spark.describe().show()      # same as pandas

25/07/10 15:37:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-----+------------------+-----------------+------------------+
|summary| Name|               age|       Experience|            Salary|
+-------+-----+------------------+-----------------+------------------+
|  count|    6|                 6|                6|                 7|
|   mean| NULL|29.833333333333332|9.666666666666666|36342.857142857145|
| stddev| NULL| 5.455883674224247|2.250925735484551|12182.891986012892|
|    min|Akash|                22|                6|             23000|
|    max| Vasu|                36|               12|             55000|
+-------+-----+------------------+-----------------+------------------+



In [10]:
# Adding Columns

df_spark = df_spark.withColumn('Experience After 2 Years',df_spark['Experience']+2)

In [11]:
df_spark.show()

+------+----+----------+------+------------------------+
|  Name| age|Experience|Salary|Experience After 2 Years|
+------+----+----------+------+------------------------+
|Apoorv|  28|         8| 30000|                      10|
| Krish|  35|        10| 23000|                      12|
| Akash|  32|        11| 34000|                      13|
|  Vasu|  22|        12| 44000|                      14|
|  Ashu|NULL|      NULL| 55000|                    NULL|
|Saloni|  26|         6| 23000|                       8|
|  NULL|  36|        11| 45400|                      13|
+------+----+----------+------+------------------------+



In [12]:
# drop the columns

df_spark = df_spark.drop('Experience After 2 Years')

In [13]:
df_spark.show()

+------+----+----------+------+
|  Name| age|Experience|Salary|
+------+----+----------+------+
|Apoorv|  28|         8| 30000|
| Krish|  35|        10| 23000|
| Akash|  32|        11| 34000|
|  Vasu|  22|        12| 44000|
|  Ashu|NULL|      NULL| 55000|
|Saloni|  26|         6| 23000|
|  NULL|  36|        11| 45400|
+------+----+----------+------+



In [14]:
# rename the column

df_spark.withColumnRenamed('age','Age').show()

+------+----+----------+------+
|  Name| Age|Experience|Salary|
+------+----+----------+------+
|Apoorv|  28|         8| 30000|
| Krish|  35|        10| 23000|
| Akash|  32|        11| 34000|
|  Vasu|  22|        12| 44000|
|  Ashu|NULL|      NULL| 55000|
|Saloni|  26|         6| 23000|
|  NULL|  36|        11| 45400|
+------+----+----------+------+



### DROPPING NULL VALUES

In [15]:
# dropping the nan value (how='any'(if a single null is present), how='all'(if all the columns have null values))

df_spark.dropna(how='any').show()            # all the rows with null values are gone

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         8| 30000|
| Krish| 35|        10| 23000|
| Akash| 32|        11| 34000|
|  Vasu| 22|        12| 44000|
|Saloni| 26|         6| 23000|
+------+---+----------+------+



In [16]:
# threshold

df_spark.dropna(how='any',thresh=3).show() 

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         8| 30000|
| Krish| 35|        10| 23000|
| Akash| 32|        11| 34000|
|  Vasu| 22|        12| 44000|
|Saloni| 26|         6| 23000|
|  NULL| 36|        11| 45400|
+------+---+----------+------+



In [17]:
# subset

df_spark.dropna(how='any',subset=['Age']).show() 

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         8| 30000|
| Krish| 35|        10| 23000|
| Akash| 32|        11| 34000|
|  Vasu| 22|        12| 44000|
|Saloni| 26|         6| 23000|
|  NULL| 36|        11| 45400|
+------+---+----------+------+



#### FILLING MISSING VALUES

In [18]:
# filling the missing value

df_spark.fillna('Missing Value',['Name','Experience','age']).show()

+-------------+----+----------+------+
|         Name| age|Experience|Salary|
+-------------+----+----------+------+
|       Apoorv|  28|         8| 30000|
|        Krish|  35|        10| 23000|
|        Akash|  32|        11| 34000|
|         Vasu|  22|        12| 44000|
|         Ashu|NULL|      NULL| 55000|
|       Saloni|  26|         6| 23000|
|Missing Value|  36|        11| 45400|
+-------------+----+----------+------+



In [19]:
# filling values with mean , median or mode

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['age','Experience'],outputCols=['age_imputed','Experience_imputed'],strategy='mean')

In [20]:
# add imputation columns to df

imputer.fit(df_spark).transform(df_spark).show()


+------+----+----------+------+-----------+------------------+
|  Name| age|Experience|Salary|age_imputed|Experience_imputed|
+------+----+----------+------+-----------+------------------+
|Apoorv|  28|         8| 30000|         28|                 8|
| Krish|  35|        10| 23000|         35|                10|
| Akash|  32|        11| 34000|         32|                11|
|  Vasu|  22|        12| 44000|         22|                12|
|  Ashu|NULL|      NULL| 55000|         29|                 9|
|Saloni|  26|         6| 23000|         26|                 6|
|  NULL|  36|        11| 45400|         36|                11|
+------+----+----------+------+-----------+------------------+



#### FILTER Operations

In [21]:
spark_df = spark.read.csv('test2.csv',header=True,inferSchema=True)
spark_df.show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         8| 30000|
| Krish| 35|        10| 23000|
| Akash| 32|        11| 34000|
|  Vasu| 22|        12| 44000|
|  Ashu| 33|        23| 55000|
|Saloni| 26|         6| 23000|
+------+---+----------+------+



In [22]:
## salary of people less than or equal to 30000

spark_df.filter('Salary <= 30000').select(['Name','age','Experience']).show()

+------+---+----------+
|  Name|age|Experience|
+------+---+----------+
|Apoorv| 28|         8|
| Krish| 35|        10|
|Saloni| 26|         6|
+------+---+----------+



In [23]:
spark_df.filter(spark_df['Salary']<= 30000).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         8| 30000|
| Krish| 35|        10| 23000|
|Saloni| 26|         6| 23000|
+------+---+----------+------+



In [24]:
# AND

spark_df.filter((spark_df['Salary']<= 30000) & 
                (spark_df['age'] >= 25)).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         8| 30000|
| Krish| 35|        10| 23000|
|Saloni| 26|         6| 23000|
+------+---+----------+------+



In [25]:
# OR

spark_df.filter((spark_df['Salary']<= 30000) | 
                (spark_df['age'] >= 25)).show()

+------+---+----------+------+
|  Name|age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         8| 30000|
| Krish| 35|        10| 23000|
| Akash| 32|        11| 34000|
|  Ashu| 33|        23| 55000|
|Saloni| 26|         6| 23000|
+------+---+----------+------+



In [26]:
# NOT

spark_df.filter(~(spark_df['Salary']<= 30000)).show()

+-----+---+----------+------+
| Name|age|Experience|Salary|
+-----+---+----------+------+
|Akash| 32|        11| 34000|
| Vasu| 22|        12| 44000|
| Ashu| 33|        23| 55000|
+-----+---+----------+------+



#### PYSPARK GROUPBY AND AGGREGATE

In [27]:
df_pyspark = spark.read.csv('test3.csv',header=True,inferSchema=True)
df_pyspark.show()

+------+-------------+------+
|  Name|   Department|Salary|
+------+-------------+------+
|Apoorv| Data Science| 30000|
| Krish| Data Analyst| 23000|
| Akash| Data Science| 34000|
|  Vasu|          IoT| 44000|
|  Ashu|Data Engineer| 55000|
|Saloni|    Developer| 23000|
+------+-------------+------+



#### Group By

In [28]:
df_pyspark.groupBy('Department').mean().show()

+-------------+-----------+
|   Department|avg(Salary)|
+-------------+-----------+
|    Developer|    23000.0|
| Data Analyst|    23000.0|
|          IoT|    44000.0|
| Data Science|    32000.0|
|Data Engineer|    55000.0|
+-------------+-----------+



In [29]:
df_pyspark.groupBy('Department').count().show()

+-------------+-----+
|   Department|count|
+-------------+-----+
|    Developer|    1|
| Data Analyst|    1|
|          IoT|    1|
| Data Science|    2|
|Data Engineer|    1|
+-------------+-----+



In [30]:
df_pyspark.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|     209000|
+-----------+



In [36]:
training = spark.read.csv('test4.csv',header=True,inferSchema=True)
training.show()

+------+---+----------+------+
|  Name|Age|Exoerience|Salary|
+------+---+----------+------+
|Apoorv| 28|         9| 30000|
| Krish| 32|        12| 23000|
| Akash| 32|        11| 34000|
|  Vasu| 19|        10| 44000|
|  Ashu| 29|        14| 55000|
|Saloni| 26|        12| 23000|
+------+---+----------+------+



In [37]:
training = training.withColumnRenamed('Exoerience','Experience')

In [38]:
training.show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|Apoorv| 28|         9| 30000|
| Krish| 32|        12| 23000|
| Akash| 32|        11| 34000|
|  Vasu| 19|        10| 44000|
|  Ashu| 29|        14| 55000|
|Saloni| 26|        12| 23000|
+------+---+----------+------+



In [42]:
training.columns

['Name', 'Age', 'Experience', 'Salary']

In [46]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=['Age','Experience'],outputCol='Independent Feature')


In [47]:
output = featureassembler.transform(training)
output.show()

+------+---+----------+------+-------------------+
|  Name|Age|Experience|Salary|Independent Feature|
+------+---+----------+------+-------------------+
|Apoorv| 28|         9| 30000|         [28.0,9.0]|
| Krish| 32|        12| 23000|        [32.0,12.0]|
| Akash| 32|        11| 34000|        [32.0,11.0]|
|  Vasu| 19|        10| 44000|        [19.0,10.0]|
|  Ashu| 29|        14| 55000|        [29.0,14.0]|
|Saloni| 26|        12| 23000|        [26.0,12.0]|
+------+---+----------+------+-------------------+



In [48]:
output.columns

['Name', 'Age', 'Experience', 'Salary', 'Independent Feature']

In [49]:
finalized_data = output.select('Salary','Independent Feature')

In [50]:
finalized_data.show()

+------+-------------------+
|Salary|Independent Feature|
+------+-------------------+
| 30000|         [28.0,9.0]|
| 23000|        [32.0,12.0]|
| 34000|        [32.0,11.0]|
| 44000|        [19.0,10.0]|
| 55000|        [29.0,14.0]|
| 23000|        [26.0,12.0]|
+------+-------------------+



In [None]:
from pyspark.ml.regression import LinearRegression

# training the data

train_data,test_data = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol='Independent Feature',labelCol='Salary')
regressor = regressor.fit(train_data)

25/07/10 17:20:06 WARN Instrumentation: [0c0c198a] regParam is zero, which might cause numerical instability and overfitting.
25/07/10 17:20:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/07/10 17:20:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [53]:
regressor.coefficients

DenseVector([-1620.1365, 4182.2526])

In [54]:
regressor.intercept

30741.979522183934

In [55]:
# Predection

pred_result = regressor.evaluate(test_data)

In [None]:
pred_result.predictions.show()

+------+-------------------+-----------------+
|Salary|Independent Feature|       prediction|
+------+-------------------+-----------------+
| 34000|        [32.0,11.0]|24902.38907849827|
+------+-------------------+-----------------+



In [65]:
pred_result.meanAbsoluteError


9097.610921501731

In [None]:
pred_result.meanSquaredError

82766524.47902758

25/07/10 18:52:49 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 907464 ms exceeds timeout 120000 ms
25/07/10 18:52:49 WARN SparkContext: Killing executors is not supported by current scheduler.
25/07/10 18:52:51 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:81)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:669)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1296)
	at o