- [Pyspark-With-Python](https://github.com/krishnaik06/Pyspark-With-Python)
- [Pyspark Docs](https://spark.apache.org/docs/latest/api/python/getting_started/install.html#python-version-supported)

# Part 1

In [None]:
!pip install pandas pyspark

In [5]:
import pyspark
import pandas as pd

In [6]:
pd.read_csv('../asset/spotify_history.csv').head()

Unnamed: 0,endTime,artistName,trackName,msPlayed
0,2021-11-09 13:20,24kGoldn,Mood (feat. iann dior),61970
1,2021-11-11 19:02,Drake,Fair Trade (with Travis Scott),270006
2,2021-11-16 11:17,Liga/Desliga,"PEGADINHAS | Liga - Terça, 16 de Novembro",161132
3,2021-11-16 11:18,Kygo,Undeniable (feat. X Ambassadors),6360
4,2021-11-16 11:21,Masego,Garden Party,17293


In [7]:
# Start Spark Session
from pyspark.sql import SparkSession

In [8]:
spark = SparkSession.builder.appName('FirstSession').getOrCreate()

22/11/15 19:02:21 WARN Utils: Your hostname, not resolves to a loopback address: 127.0.1.1; using 192.168.15.156 instead (on interface wlp1s0)
22/11/15 19:02:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/15 19:02:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
spark

In [11]:
df_pyspark = spark.read.csv('../asset/spotify_history.csv', header=True) # %timeit: 359 ms
#df_pyspark = spark.read.option('header', 'true').csv('../asset/spotify_history.csv') # %timeit: 432 ms
df_pyspark.show(5)

+----------------+------------+--------------------+--------+
|         endTime|  artistName|           trackName|msPlayed|
+----------------+------------+--------------------+--------+
|2021-11-09 13:20|    24kGoldn|Mood (feat. iann ...|   61970|
|2021-11-11 19:02|       Drake|Fair Trade (with ...|  270006|
|2021-11-16 11:17|Liga/Desliga|PEGADINHAS | Liga...|  161132|
|2021-11-16 11:18|        Kygo|Undeniable (feat....|    6360|
|2021-11-16 11:21|      Masego|        Garden Party|   17293|
+----------------+------------+--------------------+--------+
only showing top 5 rows



In [12]:
df_pyspark.printSchema()

root
 |-- endTime: string (nullable = true)
 |-- artistName: string (nullable = true)
 |-- trackName: string (nullable = true)
 |-- msPlayed: string (nullable = true)



# Part 2

- PySpark Dataframe
- Reading The Dataset
- Checking the Datatypes of the Column(Schema)
- Selecting Columns And Indexing
- Check Describe option similar to Pandas
- Adding Columns
- Dropping columns
- Renaming Columns

In [1]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('SecondSession').getOrCreate()

In [4]:
spark

In [20]:
# Read dataset
df_pyspark = spark.read.option('header', 'true').csv('../asset/spotify_history.csv', inferSchema=True)
print(df_pyspark.show(5))
print(df_pyspark.printSchema())

+-------------------+------------+--------------------+--------+
|            endTime|  artistName|           trackName|msPlayed|
+-------------------+------------+--------------------+--------+
|2021-11-09 13:20:00|    24kGoldn|Mood (feat. iann ...|   61970|
|2021-11-11 19:02:00|       Drake|Fair Trade (with ...|  270006|
|2021-11-16 11:17:00|Liga/Desliga|PEGADINHAS | Liga...|  161132|
|2021-11-16 11:18:00|        Kygo|Undeniable (feat....|    6360|
|2021-11-16 11:21:00|      Masego|        Garden Party|   17293|
+-------------------+------------+--------------------+--------+
only showing top 5 rows

None
root
 |-- endTime: timestamp (nullable = true)
 |-- artistName: string (nullable = true)
 |-- trackName: string (nullable = true)
 |-- msPlayed: integer (nullable = true)

None


 **Data Frames** is data structures, that can peform various kinds of operations

In [25]:
# Get Columns
df_pyspark.columns

['endTime', 'artistName', 'trackName', 'msPlayed']

In [26]:
df_pyspark.head(3)

[Row(endTime=datetime.datetime(2021, 11, 9, 13, 20), artistName='24kGoldn', trackName='Mood (feat. iann dior)', msPlayed=61970),
 Row(endTime=datetime.datetime(2021, 11, 11, 19, 2), artistName='Drake', trackName='Fair Trade (with Travis Scott)', msPlayed=270006),
 Row(endTime=datetime.datetime(2021, 11, 16, 11, 17), artistName='Liga/Desliga', trackName='PEGADINHAS | Liga - Terça, 16 de Novembro', msPlayed=161132)]

In [28]:
# Select column
df_pyspark.select('artistName').show(3)

+------------+
|  artistName|
+------------+
|    24kGoldn|
|       Drake|
|Liga/Desliga|
+------------+
only showing top 3 rows



In [29]:
df_pyspark.select(['artistName', 'trackName']).show(3)

+------------+--------------------+
|  artistName|           trackName|
+------------+--------------------+
|    24kGoldn|Mood (feat. iann ...|
|       Drake|Fair Trade (with ...|
|Liga/Desliga|PEGADINHAS | Liga...|
+------------+--------------------+
only showing top 3 rows



In [30]:
df_pyspark.dtypes

[('endTime', 'timestamp'),
 ('artistName', 'string'),
 ('trackName', 'string'),
 ('msPlayed', 'int')]

In [35]:
df_pyspark.describe().show()

+-------+----------+--------------------+------------------+
|summary|artistName|           trackName|          msPlayed|
+-------+----------+--------------------+------------------+
|  count|      9044|                9044|              9044|
|   mean|    3030.0|            Infinity|109133.91287041132|
| stddev|      null|                 NaN|102590.48463600103|
|    min|      $NOT|"Surface Pressure...|                 0|
|    max| אריאל לוי|相信一切是最好的安排|           1564330|
+-------+----------+--------------------+------------------+



In [44]:
# Adding columns
df_pyspark = df_pyspark.withColumn('secondsPlayed', df_pyspark['msPlayed']/60_000)
df_pyspark.printSchema()

root
 |-- endTime: timestamp (nullable = true)
 |-- artistName: string (nullable = true)
 |-- trackName: string (nullable = true)
 |-- msPlayed: integer (nullable = true)
 |-- secondsPlayed: double (nullable = true)



In [45]:
df_pyspark.show(5)

+-------------------+------------+--------------------+--------+------------------+
|            endTime|  artistName|           trackName|msPlayed|     secondsPlayed|
+-------------------+------------+--------------------+--------+------------------+
|2021-11-09 13:20:00|    24kGoldn|Mood (feat. iann ...|   61970|1.0328333333333333|
|2021-11-11 19:02:00|       Drake|Fair Trade (with ...|  270006|            4.5001|
|2021-11-16 11:17:00|Liga/Desliga|PEGADINHAS | Liga...|  161132|2.6855333333333333|
|2021-11-16 11:18:00|        Kygo|Undeniable (feat....|    6360|             0.106|
|2021-11-16 11:21:00|      Masego|        Garden Party|   17293|0.2882166666666667|
+-------------------+------------+--------------------+--------+------------------+
only showing top 5 rows



In [47]:
df_pyspark = df_pyspark.drop('msPlayed')
df_pyspark.show(3)

+-------------------+------------+--------------------+------------------+
|            endTime|  artistName|           trackName|     secondsPlayed|
+-------------------+------------+--------------------+------------------+
|2021-11-09 13:20:00|    24kGoldn|Mood (feat. iann ...|1.0328333333333333|
|2021-11-11 19:02:00|       Drake|Fair Trade (with ...|            4.5001|
|2021-11-16 11:17:00|Liga/Desliga|PEGADINHAS | Liga...|2.6855333333333333|
+-------------------+------------+--------------------+------------------+
only showing top 3 rows



In [49]:
df_pyspark = df_pyspark.withColumnRenamed('secondsPlayed', 'minutesPlayed')
df_pyspark.show(3)

+-------------------+------------+--------------------+------------------+
|            endTime|  artistName|           trackName|     minutesPlayed|
+-------------------+------------+--------------------+------------------+
|2021-11-09 13:20:00|    24kGoldn|Mood (feat. iann ...|1.0328333333333333|
|2021-11-11 19:02:00|       Drake|Fair Trade (with ...|            4.5001|
|2021-11-16 11:17:00|Liga/Desliga|PEGADINHAS | Liga...|2.6855333333333333|
+-------------------+------------+--------------------+------------------+
only showing top 3 rows



# Part 3
- Dropping Columns
- Dropping Rows
- Various Parameter In Dropping functionalities
- Handling Missing values by Mean, MEdian And Mode

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ThirdSession').getOrCreate()

In [6]:
df_pyspark = spark.read.csv('../asset/spotify_history.csv', header=True, inferSchema=True)
df_pyspark.show(2)

+-------------------+----------+--------------------+--------+
|            endTime|artistName|           trackName|msPlayed|
+-------------------+----------+--------------------+--------+
|2021-11-09 13:20:00|  24kGoldn|Mood (feat. iann ...|   61970|
|2021-11-11 19:02:00|     Drake|Fair Trade (with ...|  270006|
+-------------------+----------+--------------------+--------+
only showing top 2 rows



In [10]:
# Drop na rows
df_pyspark.na.drop() # df_pyspark.na.drop(how='any')

# Drop only rows if all columns values are null
df_pyspark.na.drop(how='all')

# Drop only rows if an specific amount of null in columns values
df_pyspark.na.drop(how='any', thresh=2)

# Drop only rows in specifics columns are null
df_pyspark.na.drop(how='any', subset=['timestamp'])

DataFrame[endTime: timestamp, artistName: string, trackName: string, msPlayed: int]

In [None]:
# Fill na rows
df_pyspark.na.fill('Missign Values', ['timestamp'])

In [17]:
df_employees_pyspark = spark.read.csv('../Pyspark-With-Python-main/test2.csv', header=True, inferSchema=True)
df_employees_pyspark.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [18]:
# Fill na with ml module 
from pyspark.ml.feature import Imputer

cols = ['age', 'Experience', 'Salary']

# Imputer will get mean/median in coluns and place in null values
imputer = Imputer(
    inputCols=cols,
    outputCols=[f'{col}_imputed' for col in cols]
).setStrategy('mean')

In [19]:
# Add imputation cols to df
imputer.fit(df_employees_pyspark).transform(df_employees_pyspark).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+-

# Part 4
- Filter Operation
- &,|,==
- ~

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('FourthSession').getOrCreate()

In [3]:
df_pyspark = spark.read.csv('../asset/spotify_history.csv', header=True, inferSchema=True)
print(df_pyspark.printSchema())
df_pyspark.show(5)

root
 |-- endTime: timestamp (nullable = true)
 |-- artistName: string (nullable = true)
 |-- trackName: string (nullable = true)
 |-- msPlayed: integer (nullable = true)

None
+-------------------+------------+--------------------+--------+
|            endTime|  artistName|           trackName|msPlayed|
+-------------------+------------+--------------------+--------+
|2021-11-09 13:20:00|    24kGoldn|Mood (feat. iann ...|   61970|
|2021-11-11 19:02:00|       Drake|Fair Trade (with ...|  270006|
|2021-11-16 11:17:00|Liga/Desliga|PEGADINHAS | Liga...|  161132|
|2021-11-16 11:18:00|        Kygo|Undeniable (feat....|    6360|
|2021-11-16 11:21:00|      Masego|        Garden Party|   17293|
+-------------------+------------+--------------------+--------+
only showing top 5 rows



In [7]:
df_pyspark.filter('msPlayed >= 200000').show(5)

+-------------------+--------------------+--------------------+--------+
|            endTime|          artistName|           trackName|msPlayed|
+-------------------+--------------------+--------------------+--------+
|2021-11-11 19:02:00|               Drake|Fair Trade (with ...|  270006|
|2021-11-16 11:34:00|           O Assunto|Inflação: a do Br...|  500371|
|2021-11-16 11:53:00|Matando Robôs Gig...|MRG 579: Arcane -...| 1143985|
|2021-11-16 12:09:00|           Lil Nas X|INDUSTRY BABY (fe...|  211999|
|2021-11-16 13:28:00|           YNW Melly|           Bang Bang|  260333|
+-------------------+--------------------+--------------------+--------+
only showing top 5 rows



In [8]:
df_pyspark.filter('msPlayed >= 1000000').select(['artistName', 'trackName', 'msPlayed']).show(5)

+--------------------+--------------------+--------+
|          artistName|           trackName|msPlayed|
+--------------------+--------------------+--------+
|Matando Robôs Gig...|MRG 579: Arcane -...| 1143985|
|Respondendo em Vo...|T2E17 — Por que s...| 1050221|
|The Joe Rogan Exp...|JRE MMA Show #130...| 1510353|
|The Joe Rogan Exp...|#1876 - Greg Fitz...| 1564330|
|Data Science Academy|Episódio 46 - Mig...| 1348179|
+--------------------+--------------------+--------+
only showing top 5 rows



In [16]:
df_pyspark.filter((df_pyspark['msPlayed']>80000) & 
                  (df_pyspark['msPlayed']<100000)).show(3)

df_pyspark.filter((df_pyspark['msPlayed']==20000) | 
                  (df_pyspark['artistName']=='Data Science Academy')).show(3)

+-------------------+-------------+--------------------+--------+
|            endTime|   artistName|           trackName|msPlayed|
+-------------------+-------------+--------------------+--------+
|2021-11-16 12:26:00|   Juice WRLD|Come & Go (with M...|   85259|
|2021-11-18 12:36:00|Clint Mansell|         Lux Aeterna|   90475|
|2021-11-22 11:26:00|     24kGoldn|Mood (feat. iann ...|   81833|
+-------------------+-------------+--------------------+--------+
only showing top 3 rows

+-------------------+--------------------+--------------------+--------+
|            endTime|          artistName|           trackName|msPlayed|
+-------------------+--------------------+--------------------+--------+
|2022-10-08 03:49:00|Data Science Academy|Episódio 46 - Mig...| 1348179|
+-------------------+--------------------+--------------------+--------+



In [20]:
# Using Not operator
df_pyspark.filter(~(df_pyspark['artistName'] == '24kGoldn')).show(5)

+-------------------+------------+--------------------+--------+
|            endTime|  artistName|           trackName|msPlayed|
+-------------------+------------+--------------------+--------+
|2021-11-11 19:02:00|       Drake|Fair Trade (with ...|  270006|
|2021-11-16 11:17:00|Liga/Desliga|PEGADINHAS | Liga...|  161132|
|2021-11-16 11:18:00|        Kygo|Undeniable (feat....|    6360|
|2021-11-16 11:21:00|      Masego|        Garden Party|   17293|
|2021-11-16 11:21:00|     Fleurie|Breathe - Legends...|  184163|
+-------------------+------------+--------------------+--------+
only showing top 5 rows



# Part 5

In [None]:
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('FifthSession').getOrCreate()

In [7]:
df_pyspark = spark.read.csv('../asset/spotify_history.csv', header=True, inferSchema=True)
df_pyspark.show(5)

+-------------------+------------+--------------------+--------+
|            endTime|  artistName|           trackName|msPlayed|
+-------------------+------------+--------------------+--------+
|2021-11-09 13:20:00|    24kGoldn|Mood (feat. iann ...|   61970|
|2021-11-11 19:02:00|       Drake|Fair Trade (with ...|  270006|
|2021-11-16 11:17:00|Liga/Desliga|PEGADINHAS | Liga...|  161132|
|2021-11-16 11:18:00|        Kygo|Undeniable (feat....|    6360|
|2021-11-16 11:21:00|      Masego|        Garden Party|   17293|
+-------------------+------------+--------------------+--------+
only showing top 5 rows



In [8]:
# Group by

# Its needded apply a group by functionality first and then a apply aggregate function(ex:. sum, max, mean, ...)
df_pyspark.groupBy('artistName').sum().show(3)

+----------+-------------+
|artistName|sum(msPlayed)|
+----------+-------------+
| Lil Nas X|     35301200|
|THE SCOTTS|       706556|
|Snoop Dogg|      2110118|
+----------+-------------+
only showing top 3 rows



In [10]:
df_pyspark.groupBy('artistName').count().show(3)

+----------+-----+
|artistName|count|
+----------+-----+
| Lil Nas X|  283|
|THE SCOTTS|   13|
|Snoop Dogg|   18|
+----------+-----+
only showing top 3 rows



In [12]:
# Directly aggregate funcion in column
df_pyspark.agg({'msPlayed': 'sum'}).show()

+-------------+
|sum(msPlayed)|
+-------------+
|    987007108|
+-------------+

22/11/16 10:12:30 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 34172714 ms exceeds timeout 120000 ms
22/11/16 10:12:31 WARN SparkContext: Killing executors is not supported by current scheduler.


# Part 6
- ml with Dataframes
- Linear regression
- Logistic regression
- Predict values

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SixthSession').getOrCreate()

In [13]:
training = spark.read.csv('../Pyspark-With-Python-main/test1.csv', header=True, inferSchema=True)
training.show()
training.printSchema()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [None]:
# Create a group with independent features
# Creating a vector assembler - will make sure thatall my features together grouped
[Age,Experience]----> new feature--->independent feature # -> features: Age, Experience

In [14]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["age", "Experience"], outputCol="Independent Features")
# inputCols parameter: will taking to group columns -> Needded be a numerical representation 
# outputCol parameter: 
# ! Strings can be converted to numerical with pyspark.ml libs

In [15]:
output = featureassembler.transform(training)
output.show()

#Basicaly Combined two columns in one column

+---------+---+----------+------+--------------------+
|     Name|age|Experience|Salary|Independent Features|
+---------+---+----------+------+--------------------+
|    Krish| 31|        10| 30000|         [31.0,10.0]|
|Sudhanshu| 30|         8| 25000|          [30.0,8.0]|
|    Sunny| 29|         4| 20000|          [29.0,4.0]|
|     Paul| 24|         3| 20000|          [24.0,3.0]|
|   Harsha| 21|         1| 15000|          [21.0,1.0]|
|  Shubham| 23|         2| 18000|          [23.0,2.0]|
+---------+---+----------+------+--------------------+



In [16]:
# Needded the features to training: Salary will be a output desired and "Independent Features" the input
finalized_data = output.select("Independent Features", "Salary")
finalized_data.show()

# Salary its dependency feature

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [31.0,10.0]| 30000|
|          [30.0,8.0]| 25000|
|          [29.0,4.0]| 20000|
|          [24.0,3.0]| 20000|
|          [21.0,1.0]| 15000|
|          [23.0,2.0]| 18000|
+--------------------+------+



In [35]:
# Train test split
from pyspark.ml.regression import LinearRegression
splits = finalized_data.randomSplit([0.75, 0.25])
train_data = splits[0]
test_data = splits[1]

# featuresCol: how many number of feature columns are present
# labelCol: the second feature that be the output feature
regressor = LinearRegression(featuresCol='Independent Features', labelCol='Salary')
regressor = regressor.fit(train_data)

22/11/16 13:25:50 WARN Instrumentation: [e96da753] regParam is zero, which might cause numerical instability and overfitting.


In [36]:
print("coefficients: ", regressor.coefficients)
print("intercept: ", regressor.intercept)

coefficients:  [2200.0000000008085,-1400.0000000010095]
intercept:  -29800.000000016233


In [37]:
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [24.0,3.0]| 20000|18800.000000000146|
|          [29.0,4.0]| 20000| 28400.00000000318|
|         [31.0,10.0]| 30000|24399.999999998727|
+--------------------+------+------------------+



In [38]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(5066.666666668102, 34453333.333355784)

# Part 7

In [37]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SeventhSession').getOrCreate()

[spotify dictionary dataset](https://www.kaggle.com/datasets/maharshipandya/-spotify-tracks-dataset)

In [35]:
df_spotify = spark.read.csv('../asset/SpotifyFeatures.csv', header=True, inferSchema=True)
df_spotify.printSchema()

root
 |-- genre: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- key: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- time_signature: string (nullable = true)
 |-- valence: string (nullable = true)





In [34]:
df_spotify = df.select(['artist_name','track_name','popularity','energy','danceability','liveness','valence','tempo','duration_ms'])
df_spotify.show(5)

+-----------------+--------------------+----------+------+------------+--------+-------+-------+-----------+
|      artist_name|          track_name|popularity|energy|danceability|liveness|valence|  tempo|duration_ms|
+-----------------+--------------------+----------+------+------------+--------+-------+-------+-----------+
|   Henri Salvador|C'est beau de fai...|         0|  0.91|       0.389|   0.346|  0.814|166.969|      99373|
|Martin & les fées|Perdu d'avance (p...|         1| 0.737|        0.59|   0.151|  0.816|174.003|     137373|
|  Joseph Williams|Don't Let Me Be L...|         3| 0.131|       0.663|   0.103|  0.368| 99.488|     170267|
|   Henri Salvador|Dis-moi Monsieur ...|         0| 0.326|        0.24|  0.0985|  0.227|171.758|     152427|
|     Fabien Nataf|           Ouverture|         4| 0.225|       0.331|   0.202|   0.39|140.576|      82625|
+-----------------+--------------------+----------+------+------------+--------+-------+-------+-----------+
only showing top 5 

In [36]:
df = spark.read.csv('../Pyspark-With-Python-main/tips.csv', header=True, inferSchema=True)
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [38]:
# Convert string category features into numerical features
from pyspark.ml.feature import StringIndexer

In [41]:
indexer = StringIndexer(inputCol='sex', outputCol='sex_indexed')
df_r = indexer.fit(df).transform(df)
df_r.show(5)

                                                                                

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
+----------+----+------+------+---+------+----+-----------+
only showing top 5 rows



In [43]:
indexer = StringIndexer(inputCols=['smoker','day','time'], outputCols=['smoker_indexed','day_indexed','time_indexed'])
df_r = indexer.fit(df_r).transform(df_r)
df_r.show(5)

+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|           0.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|           0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|           0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|           0.0|        1.0|         0.0|
+----------+----+------+------+---+------+----+-----------+--------------+-----------+------------+
only showing top 5 rows



In [47]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(
    inputCols=['tip','size','sex_indexed','smoker_indexed','day_indexed','time_indexed'], 
    outputCol='Independent Features'
)
output = featureassembler.transform(df_r)
output.select('Independent Features').show(5)

+--------------------+
|Independent Features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
+--------------------+
only showing top 5 rows



In [48]:
# Dependent feature: total_bill
finalized_data = output.select(['Independent Features', 'total_bill'])
finalized_data.show(5)

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
+--------------------+----------+
only showing top 5 rows



In [49]:
from pyspark.ml.regression import LinearRegression

splits = finalized_data.randomSplit([0.75,0.25])
train_data = splits[0]
test_data = splits[1]

regressor = LinearRegression(featuresCol='Independent Features', labelCol='total_bill')
regressor = regressor.fit(train_data)

22/11/16 20:17:17 WARN Instrumentation: [d3c52da3] regParam is zero, which might cause numerical instability and overfitting.


In [53]:
print(
    "coefficients: ", regressor.coefficients, "\n"
    "intercept: ", regressor.intercept
)

coefficients:  [2.8212709416108286,3.2744188449046723,-0.543140804589959,1.9517834155603933,-0.2829488601637782,-1.3024082933796186] 
intercept:  2.8540073861909496


In [54]:
# Predictions
pred_results = regressor.evaluate(test_data)
pred_results.predictions.show()

# total_bill is actual value and prediction is prediction value

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.47,2.0])|     10.77|13.550113360168211|
|(6,[0,1],[1.97,2.0])|     12.02|14.960748830973628|
| (6,[0,1],[2.0,2.0])|     12.69| 15.04538695922195|
|(6,[0,1],[2.34,4.0])|     17.81| 22.55345676917898|
|(6,[0,1],[2.64,3.0])|     17.59|20.125419206757556|
|(6,[0,1],[2.72,2.0])|     13.28| 17.07670203718175|
|(6,[0,1],[3.15,3.0])|     20.08|21.564267386979076|
|(6,[0,1],[5.92,3.0])|     29.03| 29.37918789524107|
|[1.0,1.0,1.0,0.0,...|      7.25|  8.40655636811649|
|[1.0,2.0,0.0,1.0,...|      12.6|14.175899433171516|
|[1.0,2.0,1.0,1.0,...|      5.75|12.783912048090222|
|[1.48,2.0,0.0,0.0...|      8.52|11.710020055877145|
|[1.5,2.0,0.0,0.0,...|     19.08| 11.76644547470936|
|[1.5,2.0,0.0,0.0,...|     12.46|  12.7859049079252|
|[1.5,2.0,0.0,1.0,...|     15.69|15.303586043813151|
|[1.5,2.0,0.0,1.0,...|     12.03|14.7376883234

In [55]:
# Peformance Metrics
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.6697109355118465, 3.8239946020649587, 29.462956306379812)