In [1]:
import findspark

In [2]:
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7/')

In [3]:
import pyspark

## Spark DataFrames Basic

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('Basics').getOrCreate();

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/27 12:30:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
df = spark.read.json('people.json');

                                                                                

In [7]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
df.columns

['age', 'name']

In [10]:
df.describe().show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



                                                                                

In [11]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

In [12]:
data_schema = [StructField('age', IntegerType(),True),
              StructField('name', StringType(), True)]

In [13]:
final_struc = StructType(fields=data_schema)

In [14]:
df = spark.read.json('people.json', schema=final_struc)

In [15]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [16]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## Spark DataFrame Basics Part Two

In [17]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [18]:
df.head(2)[0]

Row(age=None, name='Michael')

In [19]:
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [20]:
df.withColumn('double_age', df['age'] * 2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



In [21]:
# Renaming a column
df.withColumnRenamed('age', 'my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [22]:
df.createOrReplaceTempView('report')

In [23]:
results = spark.sql('SELECT * FROM REPORT')
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [24]:
new_results = spark.sql('SELECT * FROM REPORT WHERE AGE = 30')
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [25]:
new_results = spark.sql('SELECT * FROM REPORT WHERE NAME = "Michael"')
new_results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



In [26]:
new_results = spark.sql('SELECT NAME AS NOMES FROM REPORT')
new_results.show()

+-------+
|  NOMES|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



## Spark DataFrame Basic Operations

In [27]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [28]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [29]:
df.show(10)

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [30]:
df.createOrReplaceTempView('data')
grab = spark.sql('SELECT OPEN, CLOSE FROM DATA WHERE CLOSE < 500')
grab.show()

+------------------+------------------+
|              OPEN|             CLOSE|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [31]:
# df.filter('Close < 500').show()

In [32]:
df.filter(df['Close'] < 500).select(['OPEN', 'CLOSE']).show()

+------------------+------------------+
|              OPEN|             CLOSE|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



In [33]:
df.createOrReplaceTempView('data')
grab = spark.sql('SELECT * FROM DATA WHERE CLOSE < 200 AND OPEN > 200')
grab.show()

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [34]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [35]:
df.filter(df['Low'] == 197.16).show()

+----------+------------------+----------+------+------+---------+---------+
|      Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+----------+------------------+----------+------+------+---------+---------+
|2010-01-22|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+----------+------------------+----------+------+------+---------+---------+



In [36]:
# In real world we'll be using more .collect() than .show()
result = df.filter((df['Close'] < 200) & (df['Open'] > 200)).collect()
row = result[0]
row.asDict()

{'Date': '2010-01-22',
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

## Groupby and Aggregate Operations

In [37]:
# creating spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Eggs').getOrCreate()

In [38]:
df = spark.read.csv('sales_info.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [39]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [40]:
df.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [41]:
df.agg({'Sales': 'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [42]:
# Importing functions from spark
from pyspark.sql.functions import countDistinct, avg, stddev, format_number

In [43]:
# Mean
df.select(avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [44]:
sales_std = df.select(stddev('Sales').alias('STD'))
sales_std.select(format_number('STD', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [45]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [46]:
# Ascending
df.sort('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [47]:
# Descending
df.sort(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## Missing Data

In [48]:
spark = SparkSession.builder.appName('miss').getOrCreate()

In [49]:
df = spark.read.csv('ContainsNull.csv', header=True, inferSchema=True)
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [50]:
df.dropna(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [51]:
df.dropna(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [52]:
df.dropna(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [53]:
df.fillna('Nameless', subset=['Name']).show()

+----+--------+-----+
|  Id|    Name|Sales|
+----+--------+-----+
|emp1|    John| null|
|emp2|Nameless| null|
|emp3|Nameless|345.0|
|emp4|   Cindy|456.0|
+----+--------+-----+



In [54]:
# Filling sales with it's mean
from pyspark.sql.functions import mean
mean_value = df.select(mean(df['Sales'])).collect()
mean_sales = mean_value[0][0]
df.fillna(mean_sales, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## Dates and Timestamps

In [55]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [56]:
df = spark.read.csv('appl_stock.csv', header=True, inferSchema=True)

In [57]:
df.show(5)

+----------+----------+----------+------------------+------------------+---------+------------------+
|      Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+----------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+----------+----------+----------+------------------+------------------+---------+------------------+
only showing top 5 rows



In [58]:
df.select(['Date', 'Open']).show()

+----------+------------------+
|      Date|              Open|
+----------+------------------+
|2010-01-04|        213.429998|
|2010-01-05|        214.599998|
|2010-01-06|        214.379993|
|2010-01-07|            211.75|
|2010-01-08|        210.299994|
|2010-01-11|212.79999700000002|
|2010-01-12|209.18999499999998|
|2010-01-13|        207.870005|
|2010-01-14|210.11000299999998|
|2010-01-15|210.92999500000002|
|2010-01-19|        208.330002|
|2010-01-20|        214.910006|
|2010-01-21|        212.079994|
|2010-01-22|206.78000600000001|
|2010-01-25|202.51000200000001|
|2010-01-26|205.95000100000001|
|2010-01-27|        206.849995|
|2010-01-28|        204.930004|
|2010-01-29|        201.079996|
|2010-02-01|192.36999699999998|
+----------+------------------+
only showing top 20 rows



In [59]:
from pyspark.sql.functions import (dayofmonth, dayofyear, hour, month,
                                   year, weekofyear, format_number, date_format)

In [60]:
#df.select(month(df['Date'])).show()

In [61]:
# Average closing price per year
# df.select(year(df['Date'])).show()
new_df = df.withColumn('Year', year(df['date']))
new_df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
|2010-01-11|212.

In [62]:
result = new_df.groupBy('Year').mean('Close').sort('Year')
result.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2010| 259.8424600000002|
|2011|364.00432532142867|
|2012| 576.0497195640002|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2015|120.03999980555547|
|2016|104.60400786904763|
+----+------------------+



In [63]:
# Formating the date
result.select(['Year', format_number('avg(Close)',2).alias('Avg Closing Price')]).show()

+----+-----------------+
|Year|Avg Closing Price|
+----+-----------------+
|2010|           259.84|
|2011|           364.00|
|2012|           576.05|
|2013|           472.63|
|2014|           295.40|
|2015|           120.04|
|2016|           104.60|
+----+-----------------+



## Linear Regression

In [64]:
# !pip install numpy

In [65]:
from pyspark.ml.regression import LinearRegression

In [66]:
training = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

22/04/27 12:31:27 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


In [67]:
training.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

In [68]:
lr = LinearRegression(featuresCol='features', labelCol='label', predictionCol='prediction')

In [69]:
lrModel = lr.fit(training)

22/04/27 12:31:28 WARN Instrumentation: [3d78ca00] regParam is zero, which might cause numerical instability and overfitting.
22/04/27 12:31:29 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [70]:
lrModel

LinearRegressionModel: uid=LinearRegression_ae6edcd268be, numFeatures=10

In [71]:
lrModel.coefficients

DenseVector([0.0073, 0.8314, -0.8095, 2.4412, 0.5192, 1.1535, -0.2989, -0.5129, -0.6197, 0.6956])

In [72]:
lrModel.intercept

0.14228558260358093

In [73]:
training_summary = lrModel.summary
print(f'MSE {training_summary.meanSquaredError}')
print(f'MAE {training_summary.meanAbsoluteError}')
print(f'RMSE {training_summary.rootMeanSquaredError}')

MSE 103.28843028724194
MAE 8.145215527783876
RMSE 10.16309157133015


In [74]:
# More realistic situation
all_data = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

22/04/27 12:31:29 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


In [75]:
# Split
train_Data ,test_Data = all_data.randomSplit([0.7, 0.3])

In [76]:
train_Data.count()

355

In [77]:
test_Data.count()

146

In [78]:
correct_model = lr.fit(train_Data)

22/04/27 12:31:31 WARN Instrumentation: [157bb32d] regParam is zero, which might cause numerical instability and overfitting.


In [79]:
# Evaluation on test_Data
test_results = correct_model.evaluate(test_Data)
print(f'MSE {test_results.meanSquaredError}')
print(f'MAE {test_results.meanAbsoluteError}')
print(f'RMSE {test_results.rootMeanSquaredError}')

MSE 92.35451789178968
MAE 7.748461498636588
RMSE 9.61012579999813


In [80]:
unlabeled_data = test_Data.select('features')
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 20 rows



In [81]:
preds = correct_model.transform(unlabeled_data)
preds.show()

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...|  -4.018219241145397|
|(10,[0,1,2,3,4,5,...| 0.22167149908507836|
|(10,[0,1,2,3,4,5,...|  0.4326704362926248|
|(10,[0,1,2,3,4,5,...| -1.1406525170786395|
|(10,[0,1,2,3,4,5,...| -1.9752427863157647|
|(10,[0,1,2,3,4,5,...| -2.0460303884701196|
|(10,[0,1,2,3,4,5,...|  -1.082514454061944|
|(10,[0,1,2,3,4,5,...|  1.1292401075248064|
|(10,[0,1,2,3,4,5,...|     1.9114382267721|
|(10,[0,1,2,3,4,5,...| -1.4019502626208762|
|(10,[0,1,2,3,4,5,...|  1.8349841683798125|
|(10,[0,1,2,3,4,5,...|  -2.198566771275138|
|(10,[0,1,2,3,4,5,...|  3.9838190374263345|
|(10,[0,1,2,3,4,5,...|  1.8727255223040917|
|(10,[0,1,2,3,4,5,...|  -2.499417433656894|
|(10,[0,1,2,3,4,5,...|  1.8822489143387113|
|(10,[0,1,2,3,4,5,...|   3.737528486726122|
|(10,[0,1,2,3,4,5,...|-0.39498580237356895|
|(10,[0,1,2,3,4,5,...|  -1.589706299655978|
|(10,[0,1,2,3,4,5,...|  0.300209

#### Linear Regression Example Code Along

In [82]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Lr').getOrCreate()

In [83]:
from pyspark.ml.regression import LinearRegression

In [84]:
data = spark.read.csv('Ecommerce_Customers.csv', header=True, inferSchema=True)

In [85]:
data.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [86]:
# Predict Yearly Amount Spent
data.select('Yearly Amount Spent').show()

+-------------------+
|Yearly Amount Spent|
+-------------------+
|  587.9510539684005|
|  392.2049334443264|
| 487.54750486747207|
|  581.8523440352177|
|  599.4060920457634|
|   637.102447915074|
|  521.5721747578274|
|  549.9041461052942|
|  570.2004089636196|
|  427.1993848953282|
|  492.6060127179966|
|  522.3374046069357|
|  408.6403510726275|
|  573.4158673313865|
|  470.4527333009554|
|  461.7807421962299|
| 457.84769594494855|
| 407.70454754954415|
|  452.3156754800354|
|   605.061038804892|
+-------------------+
only showing top 20 rows



In [87]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [88]:
data.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [89]:
# inputCols = Feature
# outputCol = Label
assembler = VectorAssembler(inputCols=['Avg Session Length',
                             'Time on App',
                             'Time on Website',
                             'Length of Membership'],
                           outputCol='features')

In [90]:
output = assembler.transform(data)
output.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)
 |-- features: vector (nullable = true)



In [91]:
final_data = output.select('features', 'Yearly Amount Spent')
final_data.show()

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
|[33.8710378793419...|   637.102447915074|
|[32.0215955013870...|  521.5721747578274|
|[32.7391429383803...|  549.9041461052942|
|[33.9877728956856...|  570.2004089636196|
|[31.9365486184489...|  427.1993848953282|
|[33.9925727749537...|  492.6060127179966|
|[33.8793608248049...|  522.3374046069357|
|[29.5324289670579...|  408.6403510726275|
|[33.1903340437226...|  573.4158673313865|
|[32.3879758531538...|  470.4527333009554|
|[30.7377203726281...|  461.7807421962299|
|[32.1253868972878...| 457.84769594494855|
|[32.3388993230671...| 407.70454754954415|
|[32.1878120459321...|  452.3156754800354|
|[32.6178560628234...|   605.061038804892|
+----------

In [92]:
# Train test Split
train_Data ,test_Data = final_data.randomSplit([0.7, 0.3])

In [93]:
train_Data.count()

354

In [96]:
test_Data.count()

146

In [97]:
lr = LinearRegression(labelCol='Yearly Amount Spent')
lr_model = lr.fit(train_Data)

22/04/27 12:35:45 WARN Instrumentation: [c58af050] regParam is zero, which might cause numerical instability and overfitting.
22/04/27 12:35:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/04/27 12:35:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


In [101]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|-18.819241175773946|
|-20.106232273358504|
| -20.30566147436103|
| -18.52666609829308|
|-15.828383402348752|
|-15.448169968413223|
|-16.244206278614005|
|-16.991249435095366|
|-17.634953839820668|
|-14.035434530810342|
| -17.19452904821249|
|-13.150304384104116|
|-17.756260599129206|
|-14.364167599850505|
| -8.539930374596933|
|-12.482379256247745|
|-14.093447087100802|
| -9.900680133656351|
| -8.302449628170244|
|-10.108596256021178|
+-------------------+
only showing top 20 rows



In [105]:
# Evaluation
test_result = lr_model.evaluate(test_Data)
print(f'MSE {test_result.meanSquaredError}')
print(f'MAE {test_result.meanAbsoluteError}')
print(f'RMSE {test_result.rootMeanSquaredError}')

MSE 100.88587354405975
MAE 7.901523141579479
RMSE 10.044196012825504


In [102]:
final_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                500|
|   mean|  499.3140382585909|
| stddev|   79.3147815497068|
|    min| 256.67058229005585|
|    max|  765.5184619388373|
+-------+-------------------+



In [104]:
# my model explains 98% of the variance in this data
test_result.r2

0.9850231216769141

In [106]:
# Mimicing Deployment
unlabeled_data = test_Data.select('features')

In [107]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|[29.5324289670579...|
|[30.3931845423455...|
|[30.5743636841713...|
|[30.8794843441274...|
|[30.9716756438877...|
|[31.2606468698795...|
|[31.2681042107507...|
|[31.3895854806643...|
|[31.5147378578019...|
|[31.5316044825729...|
|[31.5702008293202...|
|[31.6005122003032...|
|[31.6253601348306...|
|[31.6610498227460...|
|[31.7242025238451...|
|[31.7366356860502...|
|[31.7656188210424...|
|[31.8124825597242...|
|[31.8164283341993...|
|[31.8512531286083...|
+--------------------+
only showing top 20 rows



In [108]:
predictions = lr_model.transform(unlabeled_data)

In [109]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[29.5324289670579...| 398.0010716406839|
|[30.3931845423455...|332.02053460853745|
|[30.5743636841713...| 441.7788666994559|
|[30.8794843441274...|494.43224746154965|
|[30.9716756438877...|487.89826221658404|
|[31.2606468698795...|  422.730516944574|
|[31.2681042107507...| 427.7769825383905|
|[31.3895854806643...| 409.1522812868336|
|[31.5147378578019...| 494.9801588065468|
|[31.5316044825729...|433.13957083860237|
|[31.5702008293202...| 563.9647761733197|
|[31.6005122003032...|461.21852999510907|
|[31.6253601348306...|381.55925736158724|
|[31.6610498227460...|417.30223229787407|
|[31.7242025238451...|510.01162281370193|
|[31.7366356860502...|494.27651330332515|
|[31.7656188210424...|501.12066342135836|
|[31.8124825597242...| 396.5445126058346|
|[31.8164283341993...| 519.5459437278741|
|[31.8512531286083...|  465.064982461186|
+--------------------+------------

## Logistic Regression

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Logisti Reg').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/27 15:56:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [12]:
from pyspark.ml.classification import LogisticRegression

In [17]:
df = spark.read.format('libsvm').load('sample_libsvm_data.txt')

22/04/27 16:02:49 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.


In [18]:
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [19]:
log_model = LogisticRegression()

In [26]:
log = log_model.fit(df)

In [21]:
log_summary = log.summary

In [25]:
log_summary.predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [28]:
lr_train, lr_test = df.randomSplit([0.7,0.3])

In [29]:
fit_final = log_model.fit(lr_train)

In [30]:
preds = fit_final.evaluate(lr_test)

In [33]:
preds.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[95,96,97,12...|[17.3167238373939...|[0.99999996983923...|       0.0|
|  0.0|(692,[98,99,100,1...|[22.5017389061392...|[0.99999999983110...|       0.0|
|  0.0|(692,[121,122,123...|[26.0715340386861...|[0.99999999999524...|       0.0|
|  0.0|(692,[122,123,124...|[14.9239377946288...|[0.99999966992242...|       0.0|
|  0.0|(692,[123,124,125...|[34.8949304474571...|[0.99999999999999...|       0.0|
|  0.0|(692,[124,125,126...|[32.3513316061859...|[0.99999999999999...|       0.0|
|  0.0|(692,[124,125,126...|[21.0282594113961...|[0.99999999926287...|       0.0|
|  0.0|(692,[126,127,128...|[30.8578849975642...|[0.99999999999996...|       0.0|
|  0.0|(692,[126,127,128...|[11.3141563174258...|[0.99998780114564...|       0.0|
|  0.0|(692,[126

In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [36]:
my_eval = BinaryClassificationEvaluator()

In [37]:
my_final_roc = my_eval.evaluate(preds.predictions)



In [38]:
my_final_roc

1.0

# Tree Methods Example

In [2]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7/')

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('trees').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/30 14:07:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, GBTClassifier

In [5]:
data = spark.read.format('libsvm').load('sample_libsvm_data.txt')

22/04/30 14:10:46 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
                                                                                

In [6]:
data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [7]:
train_data, test_data = data.randomSplit([0.7, 0.3])

In [9]:
# Decision tree classifier
dtc = DecisionTreeClassifier(labelCol='label', featuresCol='features')

# Random Forest Classifier
rfc = RandomForestClassifier()

# Gradient Boost Tree Classifier
gbtc = GBTClassifier()

In [10]:
# fitting the models
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbtc_model = gbtc.fit(train_data)

                                                                                

In [None]:
# Predictions
dtc_preds = dtc_model.transform(test_data)
rfr_preds = rfc_model.transform(test_data)
gbtc_preds = gbtc_model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator(metricName='accuracy')
print(f'DTC accuracy: {acc_eval.evaluate(dtc_preds)}')
print(f'RFC accuracy: {acc_eval.evaluate(rfr_preds)}')
print(f'GBTC accuracy: {acc_eval.evaluate(gbtc_preds)}')

## K-means Clustering

Examples & More

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7/')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('kmeans').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/03 11:17:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.ml.clustering import KMeans

In [4]:
data = spark.read.format('libsvm').load('sample_kmeans_data.txt')

22/05/03 11:20:40 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
                                                                                

In [5]:
data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [8]:
final_data = data.select('features')
final_data.show()

+--------------------+
|            features|
+--------------------+
|           (3,[],[])|
|(3,[0,1,2],[0.1,0...|
|(3,[0,1,2],[0.2,0...|
|(3,[0,1,2],[9.0,9...|
|(3,[0,1,2],[9.1,9...|
|(3,[0,1,2],[9.2,9...|
+--------------------+



In [25]:
kmeans = KMeans().setK(3).setSeed(1)

In [26]:
kmeans_model = kmeans.fit(final_data)

In [27]:
wssse = kmeans_model.summary.trainingCost
wssse

0.07499999999994544

In [28]:
centers = kmeans_model.clusterCenters()

In [29]:
centers

[array([9.1, 9.1, 9.1]), array([0.05, 0.05, 0.05]), array([0.2, 0.2, 0.2])]

In [30]:
predictions = kmeans_model.transform(final_data)
predictions.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         1|
|(3,[0,1,2],[0.1,0...|         1|
|(3,[0,1,2],[0.2,0...|         2|
|(3,[0,1,2],[9.0,9...|         0|
|(3,[0,1,2],[9.1,9...|         0|
|(3,[0,1,2],[9.2,9...|         0|
+--------------------+----------+



## Reccomender System

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7/')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('rec_s').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/05 10:56:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# models
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
# Movie Lens data
data = spark.read.csv('movielens_ratings.csv', header=True, inferSchema=True)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [5]:
data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [6]:
data.show(5)

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
+-------+------+------+
only showing top 5 rows



In [8]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [9]:
# split
training, test = data.randomSplit([0.8,0.2])

In [10]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating')

In [11]:
model = als.fit(training)

22/05/05 11:04:14 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/05 11:04:14 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/05/05 11:04:14 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [13]:
preds = model.transform(test)

                                                                                

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      1|   1.0|     4| 0.6757952|
|      1|   4.0|    15| 1.8757942|
|      6|   1.0|     1|0.41104212|
|      6|   1.0|     4| 1.4577866|
|      3|   1.0|     9|0.15524265|
|      3|   2.0|    22| 1.4984908|
|      3|   1.0|    26|  0.381118|
|      5|   1.0|     6|0.12843809|
|      5|   3.0|    16| 1.1454937|
|      5|   2.0|    26| 2.1097898|
|      4|   1.0|    14| 2.0674412|
|      4|   1.0|    19| 1.4846786|
|      2|   4.0|     8| 3.0584147|
|      2|   4.0|    10|   4.51926|
|      2|   1.0|    19|0.43675283|
|      2|   4.0|    21|0.93360054|
|      0|   1.0|     3| 1.3465576|
|      0|   1.0|     5| 1.4165176|
|      0|   1.0|     6| 1.4117017|
|      0|   1.0|    22| 0.8711265|
+-------+------+------+----------+
only showing top 20 rows



In [14]:
preds.show()



+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      1|   1.0|     4| 0.6757952|
|      1|   4.0|    15| 1.8757942|
|      6|   1.0|     1|0.41104212|
|      6|   1.0|     4| 1.4577866|
|      3|   1.0|     9|0.15524265|
|      3|   2.0|    22| 1.4984908|
|      3|   1.0|    26|  0.381118|
|      5|   1.0|     6|0.12843809|
|      5|   3.0|    16| 1.1454937|
|      5|   2.0|    26| 2.1097898|
|      4|   1.0|    14| 2.0674412|
|      4|   1.0|    19| 1.4846786|
|      2|   4.0|     8| 3.0584147|
|      2|   4.0|    10|   4.51926|
|      2|   1.0|    19|0.43675283|
|      2|   4.0|    21|0.93360054|
|      0|   1.0|     3| 1.3465576|
|      0|   1.0|     5| 1.4165176|
|      0|   1.0|     6| 1.4117017|
|      0|   1.0|    22| 0.8711265|
+-------+------+------+----------+
only showing top 20 rows



                                                                                

In [15]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [18]:
rmse = evaluator.evaluate(preds)
rmse

1.665487075913309

In [22]:
single_user = test.filter(test['userId']==11).select(['movieId', 'userId'])

In [23]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     13|    11|
|     19|    11|
|     21|    11|
|     30|    11|
|     36|    11|
|     47|    11|
|     67|    11|
|     75|    11|
|     77|    11|
|     78|    11|
|     80|    11|
+-------+------+



In [24]:
reccomendations = model.transform(single_user)

In [28]:
reccomendations.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     80|    11|  4.049386|
|     75|    11| 3.7398398|
|     30|    11| 3.6486545|
|     36|    11| 3.4465692|
|     67|    11| 2.3217723|
|     47|    11| 2.1774669|
|     21|    11| 1.1505594|
|     78|    11| 0.9832475|
|     19|    11| 0.6411476|
|     13|    11| 0.3785353|
|     77|    11|0.13235196|
+-------+------+----------+



## NLP(Natural Language Processing)