## Pyspark Dataframe: Reference Guide: EDA,ML,Clustering,Recommendation Systems,NLP, SparkStreaming

### Creating SparkSession and load .csv

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Test').getOrCreate()

In [21]:
df = spark.read.csv('E:\\SparkScala\\advertising.csv',inferSchema=True, header=True)
#df = spark.read.json

In [5]:
df.show()

+------------------------+---+-----------+--------------------+--------------------+-----------------+----+--------------------+-------------------+-------------+
|Daily Time Spent on Site|Age|Area Income|Daily Internet Usage|       Ad Topic Line|             City|Male|             Country|          Timestamp|Clicked on Ad|
+------------------------+---+-----------+--------------------+--------------------+-----------------+----+--------------------+-------------------+-------------+
|                   68.95| 35|    61833.9|              256.09|Cloned 5thgenerat...|      Wrightburgh|   0|             Tunisia|2016-03-27 00:53:11|            0|
|                   80.23| 31|   68441.85|              193.77|Monitored nationa...|        West Jodi|   1|               Nauru|2016-04-04 01:39:02|            0|
|                   69.47| 26|   59785.94|               236.5|Organic bottom-li...|         Davidton|   0|          San Marino|2016-03-13 20:35:42|            0|
|                   74

## EDA: Exploratory Data Analysis

### Shape

In [22]:
print((df.count(), len(df.columns)))

(1000, 10)


### Filters

In [6]:
#df.filter("column == 10").select(['column1', 'column2']).show()
#df.filter(df["column"] == 10).select(['column1', 'column2']).show()

df.filter( (df["Age"] == 35) & (df["Male"] == 1) ).show()

+------------------------+---+-----------+--------------------+--------------------+-----------------+----+--------------------+-------------------+-------------+
|Daily Time Spent on Site|Age|Area Income|Daily Internet Usage|       Ad Topic Line|             City|Male|             Country|          Timestamp|Clicked on Ad|
+------------------------+---+-----------+--------------------+--------------------+-----------------+----+--------------------+-------------------+-------------+
|                   84.59| 35|   60015.57|              226.54|Streamlined non-v...|      Lake Nicole|   1|            Cameroon|2016-03-18 13:22:35|            0|
|                   67.64| 35|   51473.28|              267.01|Programmable asym...|    Phelpschester|   1|                Peru|2016-07-02 20:23:15|            0|
|                   49.81| 35|   57009.76|              120.06|Seamless real-tim...|     Ramirezhaven|   1|       Faroe Islands|2016-01-05 04:18:46|            1|
|                   76

**Filter as Dict**

In [7]:
result = df.filter( (df["Age"] == 35) & (df["Male"] == 1) ).collect() 
row = result[0]
row.asDict()

{'Daily Time Spent on Site': 84.59,
 'Age': 35,
 'Area Income': 60015.57,
 'Daily Internet Usage': 226.54,
 'Ad Topic Line': 'Streamlined non-volatile analyzer',
 'City': 'Lake Nicole',
 'Male': 1,
 'Country': 'Cameroon',
 'Timestamp': '2016-03-18 13:22:35',
 'Clicked on Ad': 0}

### Columns

**New  Column from another**

In [8]:
from pyspark.sql.functions import length

length(df['Area Income'])
         
         

Column<b'length(Area Income)'>

In [9]:
df.withColumn('Area Income*2',df['Area Income']*2).select(['Area Income', 'Area Income*2']).show()

+-----------+-------------+
|Area Income|Area Income*2|
+-----------+-------------+
|    61833.9|     123667.8|
|   68441.85|     136883.7|
|   59785.94|    119571.88|
|   54806.18|    109612.36|
|   73889.99|    147779.98|
|   59761.56|    119523.12|
|   53852.85|     107705.7|
|   24593.33|     49186.66|
|    68862.0|     137724.0|
|   55642.32|    111284.64|
|   45632.51|     91265.02|
|   62491.01|    124982.02|
|   51636.92|    103273.84|
|   51739.63|    103479.26|
|    30976.0|      61952.0|
|   52182.23|    104364.46|
|   23936.86|     47873.72|
|   71511.08|    143022.16|
|   31087.54|     62175.08|
|   23821.72|     47643.44|
+-----------+-------------+
only showing top 20 rows



**Rename Column**

In [10]:
df.withColumnRenamed('Area Income*2','Area IncomeDouble')

DataFrame[Daily Time Spent on Site: double, Age: int, Area Income: double, Daily Internet Usage: double, Ad Topic Line: string, City: string, Male: int, Country: string, Timestamp: string, Clicked on Ad: int]

### SQL Querys

**Temporay SQL view**

In [11]:
df.createOrReplaceTempView('test')

In [12]:
spark.sql("SELECT * FROM test").show()

+------------------------+---+-----------+--------------------+--------------------+-----------------+----+--------------------+-------------------+-------------+
|Daily Time Spent on Site|Age|Area Income|Daily Internet Usage|       Ad Topic Line|             City|Male|             Country|          Timestamp|Clicked on Ad|
+------------------------+---+-----------+--------------------+--------------------+-----------------+----+--------------------+-------------------+-------------+
|                   68.95| 35|    61833.9|              256.09|Cloned 5thgenerat...|      Wrightburgh|   0|             Tunisia|2016-03-27 00:53:11|            0|
|                   80.23| 31|   68441.85|              193.77|Monitored nationa...|        West Jodi|   1|               Nauru|2016-04-04 01:39:02|            0|
|                   69.47| 26|   59785.94|               236.5|Organic bottom-li...|         Davidton|   0|          San Marino|2016-03-13 20:35:42|            0|
|                   74

In [13]:
spark.sql("SELECT * FROM test WHERE Male = 1 AND City LIKE '%town%' ORDER BY Country LIMIT 10").show()

+------------------------+---+-----------+--------------------+--------------------+----------------+----+-------------+-------------------+-------------+
|Daily Time Spent on Site|Age|Area Income|Daily Internet Usage|       Ad Topic Line|            City|Male|      Country|          Timestamp|Clicked on Ad|
+------------------------+---+-----------+--------------------+--------------------+----------------+----+-------------+-------------------+-------------+
|                   78.76| 24|   46422.76|              219.98|Reactive interact...|       Joanntown|   1|      Austria|2016-01-08 08:08:47|            0|
|                   87.98| 38|   56637.59|              222.11|Focused scalable ...|    West Roytown|   1|     Cambodia|2016-03-31 10:44:46|            0|
|                   42.05| 51|   28357.27|              174.55|Configurable 24/7...|West Eduardotown|   1|       Canada|2016-06-20 14:20:52|            1|
|                   69.11| 42|   73608.99|              231.48|Face-to

 ### groupBy and agg function

In [14]:
df.groupBy("City")

<pyspark.sql.group.GroupedData at 0x1b6636d1ac8>

In [15]:
df.groupBy("City").count().show()

+-----------------+-----+
|             City|count|
+-----------------+-----+
|      Wrightburgh|    2|
|        West Jodi|    1|
|         Davidton|    1|
|   West Terrifurt|    1|
|     South Manuel|    1|
|        Jamieberg|    1|
|      Brandonstad|    1|
| Port Jefferybury|    1|
|       West Colin|    1|
|       Ramirezton|    1|
|  West Brandonton|    1|
|East Theresashire|    1|
|   West Katiefurt|    1|
|       North Tara|    1|
|     West William|    1|
|   New Travistown|    1|
|   West Dylanberg|    1|
|      Pruittmouth|    1|
|      Jessicastad|    1|
|       Millertown|    2|
+-----------------+-----+
only showing top 20 rows



In [16]:
df.agg({'Area Income':'sum'}).show()

+-------------------+
|   sum(Area Income)|
+-------------------+
|5.500000008000003E7|
+-------------------+



In [17]:
group_data = df.groupBy("City")
group_data.agg({'Area Income':'sum'}).show()

+-----------------+------------------+
|             City|  sum(Area Income)|
+-----------------+------------------+
|      Wrightburgh|127859.01000000001|
|        West Jodi|          68441.85|
|         Davidton|          59785.94|
|   West Terrifurt|          54806.18|
|     South Manuel|          73889.99|
|        Jamieberg|          59761.56|
|      Brandonstad|          53852.85|
| Port Jefferybury|          24593.33|
|       West Colin|           68862.0|
|       Ramirezton|          55642.32|
|  West Brandonton|          45632.51|
|East Theresashire|          62491.01|
|   West Katiefurt|          51636.92|
|       North Tara|          51739.63|
|     West William|           30976.0|
|   New Travistown|          52182.23|
|   West Dylanberg|          23936.86|
|      Pruittmouth|          71511.08|
|      Jessicastad|          31087.54|
|       Millertown|          87195.42|
+-----------------+------------------+
only showing top 20 rows



### Functions

In [18]:
from pyspark.sql.functions import countDistinct,avg,stddev

df.select(avg('Area Income').alias('Area Income AVG')).show()

# alias: gave a new name to the created column

+-----------------+
|  Area Income AVG|
+-----------------+
|55000.00008000003|
+-----------------+



**Selecting number of decimals**

In [19]:
from pyspark.sql.functions import format_number

#format_number: round numbers

area_avg = df.select(avg('Area Income').alias('Area Income AVG'))
area_avg.select(format_number('Area Income AVG',2).alias('Area Income AVG')).show()

+---------------+
|Area Income AVG|
+---------------+
|      55,000.00|
+---------------+



### orderBy

**Dataframe style**

In [20]:
df.orderBy(df['Area Income'].desc()).select(['Area Income', 'City']).show()

+-----------+-------------------+
|Area Income|               City|
+-----------+-------------------+
|    79484.8|        Edwardmouth|
|   79332.33|           Mataberg|
|   78520.99|        East Ronald|
|    78119.5|  Port Whitneyhaven|
|   78092.95|North Isabellaville|
|   77988.71|    West Robertside|
|   77871.75|   North Jeremyport|
|   77567.85|       Port Destiny|
|   77460.07|      Kimberlyhaven|
|   77220.42|   Lake Jenniferton|
|   77143.61|         Staceyfort|
|   76984.21|        Lake Hailey|
|   76893.84|         Garciaview|
|   76560.59|         Jordantown|
|   76480.16|      New Jamestown|
|    76435.3|          Kellytown|
|   76408.19|  Lake Jasonchester|
|   76368.31|        Timothyfurt|
|   76246.96|     North Mercedes|
|   76003.47|        North Randy|
+-----------+-------------------+
only showing top 20 rows



**SQL style**

In [21]:
df.createOrReplaceTempView('orderBytest')
spark.sql("SELECT `Area Income`, City FROM orderBytest ORDER BY `Area Income` DESC").show()

+-----------+-------------------+
|Area Income|               City|
+-----------+-------------------+
|    79484.8|        Edwardmouth|
|   79332.33|           Mataberg|
|   78520.99|        East Ronald|
|    78119.5|  Port Whitneyhaven|
|   78092.95|North Isabellaville|
|   77988.71|    West Robertside|
|   77871.75|   North Jeremyport|
|   77567.85|       Port Destiny|
|   77460.07|      Kimberlyhaven|
|   77220.42|   Lake Jenniferton|
|   77143.61|         Staceyfort|
|   76984.21|        Lake Hailey|
|   76893.84|         Garciaview|
|   76560.59|         Jordantown|
|   76480.16|      New Jamestown|
|    76435.3|          Kellytown|
|   76408.19|  Lake Jasonchester|
|   76368.31|        Timothyfurt|
|   76246.96|     North Mercedes|
|   76003.47|        North Randy|
+-----------+-------------------+
only showing top 20 rows



### Missing Data

#### Separate numeric and categorical data

In [24]:
df_num = df.select([t[0] for t in df.dtypes if (t[1] == 'int') or (t[1] == 'double')])
df_cat = df.select([t[0] for t in df.dtypes if t[1] == 'string'])

In [4]:
df = spark.read.csv('E:\\SparkScala\\ContainsNull.csv',inferSchema=True, header=True)

In [18]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



#### Search for missings

In [19]:
from pyspark.sql import functions as F

df_null = df.select(*(F.sum(F.col(c).isNull().cast("Double")).alias(c) for c in df.columns)).toPandas()
df_null

Unnamed: 0,Id,Name,Sales
0,0.0,2.0,2.0


**Drop rows with all null values**

In [24]:
df.na.drop(how='all').show() 

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



**Drop na values of selected columns**

In [25]:
df.na.drop(subset=['Sales']).show() 

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



**Fill null values**

In [26]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [27]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [13]:
df.na.fill(float('nan')).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  NaN|
|emp2| null|  NaN|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [28]:
df.na.fill('No Name', subset = ['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



**Fill with the mean**

In [29]:
from pyspark.sql.functions import mean

mean_val = df.select(mean(df['Sales'])).collect()
mean_val

[Row(avg(Sales)=400.5)]

In [30]:
mean_val[0][0]

400.5

In [31]:
mean_sales = mean_val[0][0]

df.na.fill(mean_sales,['Sales']).show()
#in one line: df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Dates and Timestamps

In [35]:
df = spark.read.csv('E:\\SparkScala\\appl_stock.csv',inferSchema=True, header=True)

df.select(['Date','Open']).show()

+----------+------------------+
|      Date|              Open|
+----------+------------------+
|2010-01-04|        213.429998|
|2010-01-05|        214.599998|
|2010-01-06|        214.379993|
|2010-01-07|            211.75|
|2010-01-08|        210.299994|
|2010-01-11|212.79999700000002|
|2010-01-12|209.18999499999998|
|2010-01-13|        207.870005|
|2010-01-14|210.11000299999998|
|2010-01-15|210.92999500000002|
|2010-01-19|        208.330002|
|2010-01-20|        214.910006|
|2010-01-21|        212.079994|
|2010-01-22|206.78000600000001|
|2010-01-25|202.51000200000001|
|2010-01-26|205.95000100000001|
|2010-01-27|        206.849995|
|2010-01-28|        204.930004|
|2010-01-29|        201.079996|
|2010-02-01|192.36999699999998|
+----------+------------------+
only showing top 20 rows



In [45]:
from pyspark.sql.functions import (dayofmonth, hour,dayofyear, dayofyear, weekofyear, format_number,date_format, year)

In [40]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [42]:
df.select(weekofyear(df['Date'])).show()

+----------------+
|weekofyear(Date)|
+----------------+
|               1|
|               1|
|               1|
|               1|
|               1|
|               2|
|               2|
|               2|
|               2|
|               2|
|               3|
|               3|
|               3|
|               3|
|               4|
|               4|
|               4|
|               4|
|               4|
|               5|
+----------------+
only showing top 20 rows



In [55]:
newdf = df.withColumn('Year',year(df['Date']))
result = newdf.groupby('Year').mean().select(["Year","avg(Close)"])
result.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2010| 259.8424600000002|
|2011|364.00432532142867|
|2012| 576.0497195640002|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2015|120.03999980555547|
|2016|104.60400786904763|
+----+------------------+



In [58]:
result2 = result.withColumnRenamed("avg(Close)","Avg Close")

result2.select(["Year",format_number("Avg Close",2).alias("Avg Close")]).show()

+----+---------+
|Year|Avg Close|
+----+---------+
|2010|   259.84|
|2011|   364.00|
|2012|   576.05|
|2013|   472.63|
|2014|   295.40|
|2015|   120.04|
|2016|   104.60|
+----+---------+



## Machine Learning

### Linear regression

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('lr_ecommerce_customers').getOrCreate()

data = spark.read.csv('E:\\SparkScala\\Ecommerce_Customers.csv',inferSchema=True, header=True)

In [112]:
data.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [113]:
for item in data.head(1)[0]:
    print(item)

mstephenson@fernandez.com
835 Frank TunnelWrightmouth, MI 82180-9605
Violet
34.49726772511229
12.65565114916675
39.57766801952616
4.0826206329529615
587.9510539684005


#### Preparing data for ML: label and features

In [114]:
data.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [115]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Avg Session Length','Time on App','Time on Website','Length of Membership']
                           ,outputCol='features')

In [116]:
output = assembler.transform(data)
output.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)
 |-- features: vector (nullable = true)



In [118]:
final_data = output.select('features','Yearly Amount Spent')
final_data.show()

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
|[33.8710378793419...|   637.102447915074|
|[32.0215955013870...|  521.5721747578274|
|[32.7391429383803...|  549.9041461052942|
|[33.9877728956856...|  570.2004089636196|
|[31.9365486184489...|  427.1993848953282|
|[33.9925727749537...|  492.6060127179966|
|[33.8793608248049...|  522.3374046069357|
|[29.5324289670579...|  408.6403510726275|
|[33.1903340437226...|  573.4158673313865|
|[32.3879758531538...|  470.4527333009554|
|[30.7377203726281...|  461.7807421962299|
|[32.1253868972878...| 457.84769594494855|
|[32.3388993230671...| 407.70454754954415|
|[32.1878120459321...|  452.3156754800354|
|[32.6178560628234...|   605.061038804892|
+----------

#### Train and Test data

In [120]:
train_data, test_data = final_data.randomSplit([0.7,0.3])

**Create Linear Regression object**

In [121]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression (featuresCol = 'features', labelCol = 'Yearly Amount Spent', predictionCol = 'prediction')

**Training**

In [122]:
lrModel = lr.fit(train_data)

In [123]:
lrModel.coefficients

DenseVector([25.6839, 38.8674, 0.6419, 61.6344])

In [124]:
lrModel.intercept

-1059.56019185072

In [125]:
training_summary = lrModel.summary
training_summary.rootMeanSquaredError

9.728296113196304

#### Testing

In [126]:
test_results = lrModel.evaluate(test_data)

In [127]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -4.966921858349053|
| 10.521945640504896|
| -17.01456302984309|
| -4.343074215920467|
|-13.015774219660841|
|-1.1233461231821593|
|-3.7184452482358665|
| 3.7746043674629846|
| -4.425259598521279|
| -6.262894048511953|
|-14.631307920429435|
| 17.038919635449304|
|  6.179346828807354|
|-0.8930956727532475|
| 2.6319959197724643|
|-18.143341026325118|
| 1.0250391304259665|
|  8.284164225009647|
|-10.666177572081665|
|-17.379468433818204|
+-------------------+
only showing top 20 rows



In [128]:
test_results.rootMeanSquaredError

10.394822527606221

In [129]:
test_results.r2

0.983209533997304

**Predictions**

In [131]:
unlabeled_data = test_data.select('features')

In [132]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|[30.4925366965402...|
|[30.7377203726281...|
|[30.8162006488763...|
|[30.8364326747734...|
|[31.0662181616375...|
|[31.2606468698795...|
|[31.2681042107507...|
|[31.3584771924370...|
|[31.4252268808548...|
|[31.5147378578019...|
|[31.5741380228732...|
|[31.6098395733896...|
|[31.6548096756927...|
|[31.6610498227460...|
|[31.7366356860502...|
|[31.8164283341993...|
|[31.8293464559211...|
|[31.8512531286083...|
|[31.8648325480987...|
|[31.9048571310136...|
+--------------------+
only showing top 20 rows



In [133]:
predictions = lrModel.transform(unlabeled_data)
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[30.4925366965402...| 287.4381675782636|
|[30.7377203726281...|  451.258796555725|
|[30.8162006488763...| 283.1009039783121|
|[30.8364326747734...| 471.8449746429101|
|[31.0662181616375...| 461.9490674273352|
|[31.2606468698795...|422.44997738013353|
|[31.2681042107507...| 427.1889784220598|
|[31.3584771924370...| 491.4013460820124|
|[31.4252268808548...| 535.1919782532832|
|[31.5147378578019...|496.07538204497337|
|[31.5741380228732...| 559.0405800810163|
|[31.6098395733896...|427.50663001565886|
|[31.6548096756927...|469.08407689874116|
|[31.6610498227460...| 417.2514492526541|
|[31.7366356860502...|494.30145033575945|
|[31.8164283341993...| 519.2658325299815|
|[31.8293464559211...|  384.127298857549|
|[31.8512531286083...|464.70808244178875|
|[31.8648325480987...|450.55745804889534|
|[31.9048571310136...|491.32932585663434|
+--------------------+------------

### Logistic Regression

In [83]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('logr').getOrCreate()

df = spark.read.csv('E:\\SparkScala\\heart.csv',inferSchema=True, header=True)

In [85]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



In [86]:
df.columns

['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'oldpeak',
 'slope',
 'ca',
 'thal',
 'target']

In [87]:
for item in df.head(5):
    print(item)
    print('/n')

Row(age=63, sex=1, cp=3, trestbps=145, chol=233, fbs=1, restecg=0, thalach=150, exang=0, oldpeak=2.3, slope=0, ca=0, thal=1, target=1)
/n
Row(age=37, sex=1, cp=2, trestbps=130, chol=250, fbs=0, restecg=1, thalach=187, exang=0, oldpeak=3.5, slope=0, ca=0, thal=2, target=1)
/n
Row(age=41, sex=0, cp=1, trestbps=130, chol=204, fbs=0, restecg=0, thalach=172, exang=0, oldpeak=1.4, slope=2, ca=0, thal=2, target=1)
/n
Row(age=56, sex=1, cp=1, trestbps=120, chol=236, fbs=0, restecg=1, thalach=178, exang=0, oldpeak=0.8, slope=2, ca=0, thal=2, target=1)
/n
Row(age=57, sex=0, cp=0, trestbps=120, chol=354, fbs=0, restecg=1, thalach=163, exang=1, oldpeak=0.6, slope=2, ca=0, thal=2, target=1)
/n


**Preparing data for ML: label and features**

In [88]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'oldpeak',
 'slope',
 'ca',
 'thal'],outputCol='features')

In [91]:
output = assembler.transform(df)
output.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- features: vector (nullable = true)



In [92]:
final_data = output.select('features','target')
final_data.show()

+--------------------+------+
|            features|target|
+--------------------+------+
|[63.0,1.0,3.0,145...|     1|
|[37.0,1.0,2.0,130...|     1|
|[41.0,0.0,1.0,130...|     1|
|[56.0,1.0,1.0,120...|     1|
|[57.0,0.0,0.0,120...|     1|
|[57.0,1.0,0.0,140...|     1|
|[56.0,0.0,1.0,140...|     1|
|[44.0,1.0,1.0,120...|     1|
|[52.0,1.0,2.0,172...|     1|
|[57.0,1.0,2.0,150...|     1|
|[54.0,1.0,0.0,140...|     1|
|[48.0,0.0,2.0,130...|     1|
|[49.0,1.0,1.0,130...|     1|
|[64.0,1.0,3.0,110...|     1|
|[58.0,0.0,3.0,150...|     1|
|[50.0,0.0,2.0,120...|     1|
|[58.0,0.0,2.0,120...|     1|
|[66.0,0.0,3.0,150...|     1|
|[43.0,1.0,0.0,150...|     1|
|[69.0,0.0,3.0,140...|     1|
+--------------------+------+
only showing top 20 rows



#### Train and Test data

In [95]:
train_data, test_data = final_data.randomSplit([0.7,0.3])

**Create Logistic Regression object**

In [98]:
from pyspark.ml.classification import LogisticRegression

logreg = LogisticRegression (labelCol = 'target')

**Training**

In [100]:
logregModel = logreg.fit(train_data)

In [101]:
logregModel.coefficients

DenseVector([-0.025, -1.4063, 1.0545, -0.0334, -0.0011, 0.5954, 0.6333, 0.0186, -0.5778, -0.9553, 0.2247, -0.7077, -0.8241])

In [102]:
logregModel .intercept

6.20561952395422

In [104]:
training_summary = logregModel.summary
training_summary.predictions.describe().show()

+-------+------------------+-------------------+
|summary|            target|         prediction|
+-------+------------------+-------------------+
|  count|               219|                219|
|   mean|0.5616438356164384|  0.589041095890411|
| stddev| 0.497322225021984|0.49313497524305433|
|    min|               0.0|                0.0|
|    max|               1.0|                1.0|
+-------+------------------+-------------------+



**Predictions and Evaluation**

In [106]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

pred_and_labels = logregModel.evaluate(test_data)

pred_and_labels.predictions.show()

+--------------------+------+--------------------+--------------------+----------+
|            features|target|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(13,[0,1,3,4,7,10...|     1|[-1.7865646103620...|[0.14349442895241...|       1.0|
|(13,[0,1,3,4,7,10...|     1|[-0.5992549991149...|[0.35451415672557...|       1.0|
|(13,[0,3,4,7,9,11...|     0|[3.84687759780020...|[0.97909985623355...|       0.0|
|(13,[0,3,4,7,10,1...|     1|[-1.7873140332805...|[0.14340234665081...|       1.0|
|[29.0,1.0,1.0,130...|     1|[-3.1196863953224...|[0.04230247507723...|       1.0|
|[37.0,0.0,2.0,120...|     1|[-5.7391564000637...|[0.00320716241245...|       1.0|
|[38.0,1.0,2.0,138...|     1|[-0.9760162062247...|[0.27368297020338...|       1.0|
|[40.0,1.0,0.0,110...|     0|[2.67713446245765...|[0.93566384197944...|       0.0|
|[41.0,0.0,1.0,130...|     1|[-2.3292050968815...|[0.08873291762431...|       1.0|
|[41

In [107]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='target')

In [108]:
auccuracy = my_eval.evaluate(pred_and_labels.predictions)
auccuracy

0.8333333333333334

### Tree Methods (DecisionTree, GradientBoostingTree, RandomForest)

In [121]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('tree').getOrCreate()

df = spark.read.csv('E:\\SparkScala\\college.csv',inferSchema=True, header=True)

In [122]:
df.printSchema()

root
 |-- School: string (nullable = true)
 |-- Private: string (nullable = true)
 |-- Apps: integer (nullable = true)
 |-- Accept: integer (nullable = true)
 |-- Enroll: integer (nullable = true)
 |-- Top10perc: integer (nullable = true)
 |-- Top25perc: integer (nullable = true)
 |-- F_Undergrad: integer (nullable = true)
 |-- P_Undergrad: integer (nullable = true)
 |-- Outstate: integer (nullable = true)
 |-- Room_Board: integer (nullable = true)
 |-- Books: integer (nullable = true)
 |-- Personal: integer (nullable = true)
 |-- PhD: integer (nullable = true)
 |-- Terminal: integer (nullable = true)
 |-- S_F_Ratio: double (nullable = true)
 |-- perc_alumni: integer (nullable = true)
 |-- Expend: integer (nullable = true)
 |-- Grad_Rate: integer (nullable = true)



In [123]:
for item in df.head(3):
    print(item)
    print('/n')

Row(School='Abilene Christian University', Private='Yes', Apps=1660, Accept=1232, Enroll=721, Top10perc=23, Top25perc=52, F_Undergrad=2885, P_Undergrad=537, Outstate=7440, Room_Board=3300, Books=450, Personal=2200, PhD=70, Terminal=78, S_F_Ratio=18.1, perc_alumni=12, Expend=7041, Grad_Rate=60)
/n
Row(School='Adelphi University', Private='Yes', Apps=2186, Accept=1924, Enroll=512, Top10perc=16, Top25perc=29, F_Undergrad=2683, P_Undergrad=1227, Outstate=12280, Room_Board=6450, Books=750, Personal=1500, PhD=29, Terminal=30, S_F_Ratio=12.2, perc_alumni=16, Expend=10527, Grad_Rate=56)
/n
Row(School='Adrian College', Private='Yes', Apps=1428, Accept=1097, Enroll=336, Top10perc=22, Top25perc=50, F_Undergrad=1036, P_Undergrad=99, Outstate=11250, Room_Board=3750, Books=400, Personal=1165, PhD=53, Terminal=66, S_F_Ratio=12.9, perc_alumni=30, Expend=8735, Grad_Rate=54)
/n


**Preparing data for ML: label and features**

In [124]:
df.columns

['School',
 'Private',
 'Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate']

In [125]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Apps',
 'Accept',
 'Enroll',
 'Top10perc',
 'Top25perc',
 'F_Undergrad',
 'P_Undergrad',
 'Outstate',
 'Room_Board',
 'Books',
 'Personal',
 'PhD',
 'Terminal',
 'S_F_Ratio',
 'perc_alumni',
 'Expend',
 'Grad_Rate'],outputCol='features')

In [126]:
output = assembler.transform(df)
output.printSchema()

root
 |-- School: string (nullable = true)
 |-- Private: string (nullable = true)
 |-- Apps: integer (nullable = true)
 |-- Accept: integer (nullable = true)
 |-- Enroll: integer (nullable = true)
 |-- Top10perc: integer (nullable = true)
 |-- Top25perc: integer (nullable = true)
 |-- F_Undergrad: integer (nullable = true)
 |-- P_Undergrad: integer (nullable = true)
 |-- Outstate: integer (nullable = true)
 |-- Room_Board: integer (nullable = true)
 |-- Books: integer (nullable = true)
 |-- Personal: integer (nullable = true)
 |-- PhD: integer (nullable = true)
 |-- Terminal: integer (nullable = true)
 |-- S_F_Ratio: double (nullable = true)
 |-- perc_alumni: integer (nullable = true)
 |-- Expend: integer (nullable = true)
 |-- Grad_Rate: integer (nullable = true)
 |-- features: vector (nullable = true)



In [127]:
#Changed label column to binary

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'Private', outputCol = 'PrivateIndex')

output_fixed = indexer.fit(output).transform(output)

In [128]:
final_data = output_fixed.select('features','PrivateIndex')
final_data.show()

+--------------------+------------+
|            features|PrivateIndex|
+--------------------+------------+
|[1660.0,1232.0,72...|         0.0|
|[2186.0,1924.0,51...|         0.0|
|[1428.0,1097.0,33...|         0.0|
|[417.0,349.0,137....|         0.0|
|[193.0,146.0,55.0...|         0.0|
|[587.0,479.0,158....|         0.0|
|[353.0,340.0,103....|         0.0|
|[1899.0,1720.0,48...|         0.0|
|[1038.0,839.0,227...|         0.0|
|[582.0,498.0,172....|         0.0|
|[1732.0,1425.0,47...|         0.0|
|[2652.0,1900.0,48...|         0.0|
|[1179.0,780.0,290...|         0.0|
|[1267.0,1080.0,38...|         0.0|
|[494.0,313.0,157....|         0.0|
|[1420.0,1093.0,22...|         0.0|
|[4302.0,992.0,418...|         0.0|
|[1216.0,908.0,423...|         0.0|
|[1130.0,704.0,322...|         0.0|
|[3540.0,2001.0,10...|         1.0|
+--------------------+------------+
only showing top 20 rows



#### Train and Test data

In [129]:
train_data, test_data = final_data.randomSplit([0.7,0.3])

**Create Tree methods objects/classifiers**

In [148]:
from pyspark.ml.classification import (DecisionTreeClassifier, GBTClassifier, RandomForestClassifier)

# Use mostly defaults to make this comparison "fair"

dtc = DecisionTreeClassifier(labelCol='PrivateIndex',featuresCol='features')
rfc = RandomForestClassifier(numTrees = 100,labelCol='PrivateIndex',featuresCol='features')
gbt = GBTClassifier(labelCol='PrivateIndex',featuresCol='features')

**Training**

In [158]:
# Train the models (its three models, so it might take some time)
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

**Feature importance**

In [159]:
dtc_model.featureImportances

SparseVector(17, {1: 0.0166, 3: 0.0088, 4: 0.0078, 5: 0.3156, 6: 0.0511, 7: 0.5681, 14: 0.0182, 16: 0.0137})

In [160]:
rfc_model.featureImportances

SparseVector(17, {0: 0.0337, 1: 0.0592, 2: 0.1149, 3: 0.0132, 4: 0.0057, 5: 0.1922, 6: 0.0664, 7: 0.2429, 8: 0.0591, 9: 0.0079, 10: 0.0093, 11: 0.0107, 12: 0.0126, 13: 0.0672, 14: 0.0312, 15: 0.0477, 16: 0.0261})

In [161]:
gbt_model.featureImportances

SparseVector(17, {0: 0.0271, 1: 0.008, 2: 0.0105, 3: 0.0599, 4: 0.0519, 5: 0.3134, 6: 0.0288, 7: 0.291, 8: 0.0699, 9: 0.0082, 10: 0.0077, 11: 0.0131, 12: 0.0186, 13: 0.0125, 14: 0.0186, 15: 0.018, 16: 0.0427})

**Test**

In [144]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

**Predictions and Evaluation**

In [146]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Select (prediction, true label) and compute test error
binary_evaluator = BinaryClassificationEvaluator(labelCol="PrivateIndex")


dtc_be = binary_evaluator.evaluate(dtc_predictions)
rfc_be = binary_evaluator.evaluate(rfc_predictions)
gbt_be = binary_evaluator.evaluate(gbt_predictions)

print('-'*80)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_be*100))
print('-'*80)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_be*100))
print('-'*80)
print('A ensemble using GBT had an accuracy of: {0:2.2f}%'.format(gbt_be*100))



--------------------------------------------------------------------------------
A single decision tree had an accuracy of: 87.76%
--------------------------------------------------------------------------------
A random forest ensemble had an accuracy of: 97.03%
--------------------------------------------------------------------------------
A ensemble using GBT had an accuracy of: 95.68%


In [147]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="PrivateIndex", metricName = 'accuracy')


dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

print('-'*80)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*80)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*80)
print('A ensemble using GBT had an accuracy of: {0:2.2f}%'.format(gbt_acc*100))


--------------------------------------------------------------------------------
A single decision tree had an accuracy of: 92.59%
--------------------------------------------------------------------------------
A random forest ensemble had an accuracy of: 92.59%
--------------------------------------------------------------------------------
A ensemble using GBT had an accuracy of: 92.59%


## Clustering

### Kmeans

In [47]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans

spark = SparkSession.builder.appName('cluster').getOrCreate()

data = spark.read.csv('E:\\SparkScala\\hack_data.csv',inferSchema=True, header=True)

In [48]:
data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)



In [49]:
data.head(1)

[Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)]

**Scaling data**

In [50]:
from pyspark.ml.feature import VectorAssembler

In [51]:
data.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [52]:
assembler = VectorAssembler(inputCols=['Session_Connection_Time',
'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'WPM_Typing_Speed'], outputCol='features')

final_data=assembler.transform(data)

final_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)



In [53]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler (inputCol='features', outputCol='scaledFeatures')

scaler_model = scaler.fit(final_data)

cluster_final_data  = scaler_model.transform(final_data)

**Training**

In [25]:
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2).setSeed(1)
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3).setSeed(1)

model_k2 = kmeans2.fit(cluster_final_data)
model_k3 = kmeans3.fit(cluster_final_data)

model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

NameError: name 'KMeans' is not defined

In [60]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+



In [63]:
centers_k2 = model_k2.clusterCenters()
centers_k2

[array([2.99991988, 2.92319035, 1.05261534, 3.20390443, 4.51321315,
        3.28474   ]),
 array([1.26023837, 1.31829808, 0.99280765, 1.36491885, 2.5625043 ,
        5.26676612])]

**Prediction**

In [68]:
model_k2.transform(cluster_final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 20 rows



## Recommender System - ALS Spark ml

In [69]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('rec').getOrCreate()

data = spark.read.csv('E:\\SparkScala\\movielens_ratings.csv',inferSchema=True, header=True)

In [70]:
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [72]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



**Training**

In [73]:
training,test = data.randomSplit([0.8,0.2])

In [74]:
als =  ALS(maxIter=5,regParam=0.01,userCol='userId',itemCol='movieId',ratingCol='rating')

In [75]:
model = als.fit(training)

**Prediction and evaluation**

In [76]:
predictions = model.transform(test)

In [77]:
predictions.show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|     31|   4.0|    12|-0.7838117|
|     31|   3.0|     7| 1.7121273|
|     31|   1.0|    24| 0.7028988|
|     31|   3.0|    14| 2.2068293|
|     31|   1.0|    18|  3.044821|
|     85|   1.0|    13| 3.3847582|
|     85|   1.0|     5| 0.7804052|
|     85|   1.0|     2| 1.5486727|
|     65|   1.0|    19| 0.9705255|
|     65|   5.0|    23|0.80290216|
|     65|   1.0|    24| 1.8640641|
|     53|   3.0|    20|  1.303222|
|     53|   5.0|     8| 3.9632707|
|     53|   3.0|    14| 1.4016422|
|     78|   1.0|    12|0.94908607|
|     78|   1.0|     2|0.43352938|
|     34|   4.0|     2| 3.1728232|
|     81|   3.0|    26|0.23181152|
|     81|   1.0|     1| 1.4737167|
|     81|   1.0|    21| 1.6361859|
+-------+------+------+----------+
only showing top 20 rows



In [78]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.792724885582561


**Single user recommendation**

In [79]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     10|    11|
|     12|    11|
|     13|    11|
|     27|    11|
|     36|    11|
|     38|    11|
|     40|    11|
|     41|    11|
|     47|    11|
|     62|    11|
|     71|    11|
|     80|    11|
|     89|    11|
+-------+------+



In [80]:
recomendations = model.transform(single_user)

recomendations.orderBy('prediction',ascending=False).show()

+-------+------+-----------+
|movieId|userId| prediction|
+-------+------+-----------+
|     27|    11|   5.276105|
|     71|    11|   4.338571|
|     38|    11|  2.5989215|
|     12|    11|  1.5053954|
|     36|    11|  1.2785971|
|     89|    11|  1.1557362|
|     13|    11|  1.0427175|
|     40|    11| 0.70125484|
|     10|    11| 0.20727384|
|     80|    11| 0.11437428|
|     41|    11|-0.26433903|
|     47|    11| -1.0566144|
|     62|    11| -3.1297584|
+-------+------+-----------+



## NLP

In [83]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [84]:
sentenceDataFrame = spark.createDataFrame([(0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

sentenceDataFrame.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic,regressi...|
+---+--------------------+



**Tokenizer**

In [88]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+



In [89]:
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words").withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+-----------------------------------+------------------------------------------+------+



**StopWords**

In [90]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



**n-grams**

An n-gram is a sequence of nn tokens (typically words) for some integer nn. The NGram class can be used to transform input features into nn-grams.

In [91]:
from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



**TF-IDF**

Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus

In [92]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|  0.0|Hi I heard about ...|
|  0.0|I wish Java could...|
|  1.0|Logistic regressi...|
+-----+--------------------+



In [93]:
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|
|  0.0|I wish Java could...|[i, wish, java, c...|
|  1.0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [94]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+



**CountVectorizer**

In [95]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



**Example**

In [99]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('nlp').getOrCreate()
data = spark.read.csv("E:\\SparkScala\\SMSSpamCollection",inferSchema=True,sep='\t')
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [102]:
from pyspark.sql.functions import length

data = data.withColumn('length',length(data['text']))

data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [103]:
data.groupBy('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [106]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer

tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_to_num = StringIndexer(inputCol='class',outputCol='label')

In [107]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

clean_up = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

### Model

**Pipeline**

In [108]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()

#Pipeline

from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[ham_spam_to_num,tokenizer,stopremove,count_vec,idf,clean_up])

cleaner = data_prep_pipe.fit(data)

clean_data = cleaner.transform(data)

**Training**

In [111]:
clean_data = clean_data.select(['label','features'])

(training,testing) = clean_data.randomSplit([0.7,0.3])

spam_predictor = nb.fit(training)

data.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



**Testing**

In [112]:
test_results = spam_predictor.transform(testing)
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,7,8...|[-806.91554692136...|[1.0,5.0313921217...|       0.0|
|  0.0|(13424,[0,1,2,41,...|[-1060.0763099804...|[1.0,1.0010834603...|       0.0|
|  0.0|(13424,[0,1,3,9,1...|[-591.60623033257...|[0.99999999996580...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-1170.2132665604...|[1.0,4.4268199765...|       0.0|
|  0.0|(13424,[0,1,7,15,...|[-658.14673929182...|[1.0,1.1765181807...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-558.36937489931...|[1.0,3.8397344066...|       0.0|
|  0.0|(13424,[0,1,9,14,...|[-558.36937489931...|[1.0,3.8397344066...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1379.6926927541...|[1.0,7.0657612800...|       0.0|
|  0.0|(13424,[0,1,17,19...|[-800.14645937540...|[1.0,8.8634507208...|       0.0|
|  0.0|(13424,[0

**Evaluation**

In [113]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))

Accuracy of model at predicting spam was: 0.9257039277582748
