In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [3]:
df = spark.read.csv('sales_info.csv',inferSchema=True, header=True)

In [4]:
df

DataFrame[Company: string, Person: string, Sales: double]

In [5]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [6]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [12]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [16]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [17]:
df.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [18]:
df.groupBy('Company').min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [24]:
df.groupBy('Company').min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [27]:
df.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [28]:
group_data = df.groupBy('Company')

In [29]:
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [31]:
from pyspark.sql.functions import countDistinct,avg,stddev

In [32]:
df.select(avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [34]:
df.select(avg('Sales').alias('Average sales')).show()

+-----------------+
|    Average sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [35]:
df.select(stddev('Sales')).show() #Now I dont want long column name and result so formatting this entire result

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [36]:
from pyspark.sql.functions import format_number

In [42]:
sales_std=df.select(stddev('Sales').alias('std'))

In [43]:
sales_std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [45]:
sales_std.select(format_number('std',2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



In [48]:
df.orderBy('Sales').show()  #Ascending order

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [49]:
df.orderBy(df['Sales'].desc()).show()    #Descending order

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [50]:
from pyspark.sql import SparkSession

In [51]:
spark = SparkSession.builder.appName('miss').getOrCreate()

In [52]:
 df = spark.read.csv('ContainsNull.csv', header=True,inferSchema=True)

In [53]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [54]:
df.na.drop().show() #This drops everything even one column is null in a row

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [56]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [57]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [59]:
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [61]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [62]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [64]:
df.na.fill('FILL VALUE').show()

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [66]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [67]:
from pyspark.sql.functions import mean

In [68]:
mean_val = df.select(mean(df['Sales'])).collect()

In [75]:
mean_val

[Row(avg(Sales)=400.5)]

In [78]:
mean_sales = mean_val[0][0]

In [79]:
mean_sales

400.5

In [72]:
df.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [73]:
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [80]:
#dates and timestamp

In [84]:
spark=SparkSession.builder.appName('dates').getOrCreate()

In [85]:
df = spark.read.csv('appl_stock.csv', header=True,inferSchema=True)

In [86]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [87]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [88]:
df.select(['Date','Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [90]:
from pyspark.sql.functions import (dayofmonth,dayofyear,hour,
                                   month,year,weekofyear,format_number,
                                   date_format)

In [92]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [93]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [94]:
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [97]:
df.columns   #To get column names

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [1]:
##Linear regression

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('lrex').getOrCreate()

In [6]:
from pyspark.ml.regression import LinearRegression

In [8]:
training = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

In [13]:
training.show()

+-------------------+--------------------+
|              label|            features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
|  7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
|  5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+----------

In [14]:
lr = LinearRegression(featuresCol='features', labelCol='label',predictionCol='predictions')

In [15]:
lrModel = lr.fit(training)

In [17]:
lrModel.coefficients

DenseVector([0.0073, 0.8314, -0.8095, 2.4412, 0.5192, 1.1535, -0.2989, -0.5129, -0.6197, 0.6956])

In [18]:
lrModel.intercept

0.14228558260358093

In [20]:
training_summary = lrModel.summary

In [21]:
training_summary.r2

0.027839179518600154

In [24]:
training_summary.rootMeanSquaredError

10.16309157133015

In [25]:
all_data = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

In [30]:
train_data,test_data= all_data.randomSplit([0.7,0.3])

In [34]:
train_data.describe().show()

+-------+-------------------+
|summary|              label|
+-------+-------------------+
|  count|                346|
|   mean| 0.3877705551275605|
| stddev| 10.363330805501386|
|    min|-28.571478869743427|
|    max|  27.78383192005107|
+-------+-------------------+



In [35]:
test_data.describe().show()

+-------+--------------------+
|summary|               label|
+-------+--------------------+
|  count|                 155|
|   mean|-0.03527298164158...|
| stddev|  10.243047092578733|
|    min| -28.046018037776633|
|    max|  24.290551295953957|
+-------+--------------------+



In [37]:
correct_model = lr.fit(train_data)

In [38]:
test_results = correct_model.evaluate(test_data)

In [39]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -27.64369784360104|
| -26.07704896664576|
|  -21.6338655317126|
| -21.59388092192396|
|-21.805355809351408|
|-21.390085695989146|
| -19.45598871619133|
|-20.961193880579682|
|-21.280169981354454|
|-18.187328612397028|
|-16.829869394982175|
|-14.670178358227759|
|-17.571653567662437|
|-17.533330058229513|
|-14.351615171408858|
|-17.011947050335706|
| -13.76625619270842|
| -15.40327903151484|
| -10.41886596931166|
| -9.998913948828521|
+-------------------+
only showing top 20 rows



In [40]:
unlabeled_data = test_data.select('features')

In [41]:
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 20 rows



In [42]:
predictions = correct_model.transform(unlabeled_data)

In [43]:
predictions.show()

+--------------------+--------------------+
|            features|         predictions|
+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...| -0.4023201941755949|
|(10,[0,1,2,3,4,5,...|  3.1272230304496844|
|(10,[0,1,2,3,4,5,...|  -1.203594885206739|
|(10,[0,1,2,3,4,5,...|   1.720889883855557|
|(10,[0,1,2,3,4,5,...|  2.0225930197368713|
|(10,[0,1,2,3,4,5,...|  1.7227670806174258|
|(10,[0,1,2,3,4,5,...|  0.6100662432927502|
|(10,[0,1,2,3,4,5,...|  2.6859803145750503|
|(10,[0,1,2,3,4,5,...|  3.8514954104149464|
|(10,[0,1,2,3,4,5,...|  0.8606078797210801|
|(10,[0,1,2,3,4,5,...| 0.13766237367106876|
|(10,[0,1,2,3,4,5,...|  -1.281334207566815|
|(10,[0,1,2,3,4,5,...|   1.709644240091877|
|(10,[0,1,2,3,4,5,...|  2.1574723349172147|
|(10,[0,1,2,3,4,5,...|-0.41114308152226947|
|(10,[0,1,2,3,4,5,...|   2.682968541260263|
|(10,[0,1,2,3,4,5,...| -0.2098747384442829|
|(10,[0,1,2,3,4,5,...|  1.5361911363560714|
|(10,[0,1,2,3,4,5,...| -2.0604142421398373|
|(10,[0,1,2,3,4,5,...|  -2.46874

In [44]:
#Actual linear regression example

In [45]:
from pyspark.sql import SparkSession

In [46]:
spark = SparkSession.builder.appName('lr_example').getOrCreate()

In [47]:
from pyspark.ml.regression import LinearRegression

In [48]:
data= spark.read.csv('Ecommerce_Customers.csv', inferSchema=True,header=True)

In [49]:
data.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [54]:
for item in data.head(1)[0]:
    print(item)

mstephenson@fernandez.com
835 Frank TunnelWrightmouth, MI 82180-9605
Violet
34.49726772511229
12.65565114916675
39.57766801952616
4.0826206329529615
587.9510539684005


In [55]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [57]:
data.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [59]:
assembler = VectorAssembler(inputCols=['Email','Address','Avatar','Avg Session Length',
                                 'Time on App','Time on Website','Length of Membership',
                                    'Yearly Amount Spent'],outputCol='features') 

In [62]:
output = assembler.transform(data)

IllegalArgumentException: 'Data type StringType is not supported.'

In [1]:
#Logistic regression

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('mylogreg').getOrCreate()

In [4]:
from pyspark.ml.classification import LogisticRegression

In [5]:
my_data = spark.read.format('libsvm').load('sample_libsvm_data.txt')

In [6]:
my_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [7]:
my_log_reg_model = LogisticRegression()

In [8]:
fitted_logreg = my_log_reg_model.fit(my_data)

In [9]:
log_summary = fitted_logreg.summary

In [11]:
log_summary.predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [12]:
log_summary.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[127,128,129...|[39.9727764450750...|[1.0,4.3655982185...|       0.0|
|  1.0|(692,[158,159,160...|[-35.662380562160...|[3.25105944320044...|       1.0|
|  1.0|(692,[124,125,126...|[-39.336799621156...|[8.24603148700906...|       1.0|
|  1.0|(692,[152,153,154...|[-28.219286248176...|[5.55289803944932...|       1.0|
|  1.0|(692,[151,152,153...|[-28.142070329444...|[5.99865861146384...|       1.0|
|  0.0|(692,[129,130,131...|[37.8748140555402...|[1.0,3.5577649524...|       0.0|
|  1.0|(692,[158,159,160...|[-36.610101257122...|[1.26018713626709...|       1.0|
|  1.0|(692,[99,100,101,...|[-29.504314986871...|[1.53616833535573...|       1.0|
|  0.0|(692,[154,155,156...|[13.6899025280154...|[0.99999886616363...|       0.0|
|  0.0|(692,[127

In [13]:
lr_train,lr_test=my_data.randomSplit([0.7,0.3])

In [14]:
final_model = LogisticRegression()

In [15]:
fit_final = final_model.fit(lr_train)

In [17]:
prediction_and_labels = fit_final.evaluate(lr_test)

In [18]:
prediction_and_labels.predictions.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[100,101,102...|[11.8502672635437...|[0.99999286340224...|       0.0|
|  0.0|(692,[121,122,123...|[25.3445385254023...|[0.99999999999015...|       0.0|
|  0.0|(692,[124,125,126...|[38.7496950481521...|[1.0,1.4832735936...|       0.0|
|  0.0|(692,[124,125,126...|[34.1244990295316...|[0.99999999999999...|       0.0|
|  0.0|(692,[124,125,126...|[26.1224507601587...|[0.99999999999547...|       0.0|
|  0.0|(692,[124,125,126...|[26.1909278053026...|[0.99999999999577...|       0.0|
|  0.0|(692,[125,126,127...|[47.8525421813543...|[1.0,1.6516004236...|       0.0|
|  0.0|(692,[127,128,129...|[20.9959760215867...|[0.99999999923868...|       0.0|
|  0.0|(692,[128,129,130...|[26.4766790154203...|[0.99999999999682...|       0.0|
|  0.0|(692,[155

In [19]:
from pyspark.ml.evaluation import (BinaryClassificationEvaluator, 
                                   MulticlassClassificationEvaluator)

In [20]:
my_eval = BinaryClassificationEvaluator()

In [22]:
my_final_roc=my_eval.evaluate(prediction_and_labels.predictions)

In [23]:
my_final_roc

1.0

In [24]:
#Logistic regression consulting project

In [25]:
from pyspark.sql import SparkSession

In [26]:
spark = SparkSession.builder.appName('logregconsult').getOrCreate()

In [27]:
data = spark.read.csv('customer_churn.csv', inferSchema=True, header=True)

In [29]:
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [30]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [31]:
from pyspark.ml.feature import VectorAssembler

In [32]:
assembler =  VectorAssembler(inputCols=['Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'],outputCol='features')

In [33]:
output = assembler.transform(data)

In [34]:
final_data = output.select('features', 'churn')

In [35]:
final_data.show()

+--------------------+-----+
|            features|churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
|[37.0,9191.58,0.0...|    1|
|[48.0,10356.02,0....|    1|
|[44.0,11331.58,1....|    1|
|[32.0,9885.12,1.0...|    1|
|[43.0,14062.6,1.0...|    1|
|[40.0,8066.94,1.0...|    1|
|[30.0,11575.37,1....|    1|
|[45.0,8771.02,1.0...|    1|
|[45.0,8988.67,1.0...|    1|
|[40.0,8283.32,1.0...|    1|
|[41.0,6569.87,1.0...|    1|
|[38.0,10494.82,1....|    1|
|[45.0,8213.41,1.0...|    1|
|[43.0,11226.88,0....|    1|
|[53.0,5515.09,0.0...|    1|
|[46.0,8046.4,1.0,...|    1|
+--------------------+-----+
only showing top 20 rows



In [36]:
train_churn,test_churn = final_data.randomSplit([0.7,0.3])

In [38]:
from pyspark.ml.classification import LogisticRegression

In [39]:
lr_churn  = LogisticRegression(labelCol='churn')

In [40]:
fitted_churn_model = lr_churn.fit(train_churn)

In [41]:
training_sum = fitted_churn_model.summary

In [42]:
training_sum.predictions.describe().show()

+-------+------------------+------------------+
|summary|             churn|        prediction|
+-------+------------------+------------------+
|  count|               625|               625|
|   mean|             0.168|            0.1296|
| stddev|0.3741657386773942|0.3361318422652271|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+



In [43]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [44]:
pred_and_lables = fitted_churn_model.evaluate(test_churn)

In [45]:
pred_and_lables.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[25.0,9672.03,0.0...|    0|[4.34067969823607...|[0.98713986726055...|       0.0|
|[29.0,10203.18,1....|    0|[3.45679358903725...|[0.96943309542265...|       0.0|
|[29.0,12711.15,0....|    0|[4.97862470003543...|[0.99316353599748...|       0.0|
|[29.0,13255.05,1....|    0|[3.88013430799746...|[0.97976966520375...|       0.0|
|[30.0,10183.98,1....|    0|[2.68686348549025...|[0.93624702367791...|       0.0|
|[31.0,5387.75,0.0...|    0|[2.28101282658665...|[0.90729227391944...|       0.0|
|[31.0,9574.89,0.0...|    0|[2.87765206344043...|[0.94673057707939...|       0.0|
|[31.0,10058.87,1....|    0|[4.06804351786830...|[0.98317702233938...|       0.0|
|[31.0,11743.24,0....|    0|[6.37859410176975...|[0.99830536884658...|       0.0|
|[32.0,10716.75,

In [46]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='churn')

In [47]:
auc = churn_eval.evaluate(pred_and_lables.predictions)

In [48]:
auc

0.7980676328502415

In [49]:
final_lr_model = lr_churn.fit(final_data)

In [50]:
new_customers = spark.read.csv('new_customers.csv', inferSchema=True, header=True)

In [51]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [52]:
test_new_customers = assembler.transform(new_customers)

In [53]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [54]:
final_results = final_lr_model.transform(test_new_customers)

In [55]:
final_results.show()

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+--------------------+--------------------+--------------------+----------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|            features|       rawPrediction|         probability|prediction|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+--------------------+--------------------+--------------------+----------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|[37.0,9935.53,1.0...|[2.22168692910057...|[0.90218017010563...|       0.0|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|[23.0,7526.94,1.0...|[-6.2207539269959...|[0.00198380274076...|       

In [58]:
final_results.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+



In [59]:
#Decision tree algorithm

In [60]:
from pyspark.sql import SparkSession

In [61]:
spark = SparkSession.builder.appName('mytree').getOrCreate()

In [62]:
from pyspark.ml import Pipeline

In [64]:
from pyspark.ml.classification import (RandomForestClassifier,GBTClassifier,
                                    DecisionTreeClassifier)

In [65]:
data = spark.read.format('libsvm').load('sample_libsvm_data.txt')

In [66]:
data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|
|  0.0|(692,[152,153,154...|
|  1.0|(692,[97,98,99,12...|
|  1.0|(692,[124,125,126...|
+-----+--------------------+
only showing top 20 rows



In [67]:
train_data,test_data = data.randomSplit([0.7,0.3])

In [68]:
dtc = DecisionTreeClassifier()
rfc = RandomForestClassifier(numTrees=100)
gbt = GBTClassifier()

In [73]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [74]:
dtc_preds=dtc_model.transform(test_data)
rfc_preds=rfc_model.transform(test_data)
gbt_preds=gbt_model.transform(test_data)

In [75]:
dtc_preds.show()

+-----+--------------------+-------------+-----------+----------+
|label|            features|rawPrediction|probability|prediction|
+-----+--------------------+-------------+-----------+----------+
|  0.0|(692,[95,96,97,12...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[122,123,148...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[124,125,126...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[124,125,126...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[126,127,128...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[126,127,128...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[151,152,153...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[152,153,154...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[152,153,154...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[153,154,155...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[153,154,155...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  0.0|(692,[154,155,156...|   [31.0,0.0]|  [1.0,0.0]|       0.0|
|  1.0|(69

In [76]:
gbt_preds.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(692,[95,96,97,12...|[1.54350200272498...|[0.95635347857270...|       0.0|
|  0.0|(692,[122,123,148...|[1.54350200272498...|[0.95635347857270...|       0.0|
|  0.0|(692,[124,125,126...|[1.54350200272498...|[0.95635347857270...|       0.0|
|  0.0|(692,[124,125,126...|[1.54350200272498...|[0.95635347857270...|       0.0|
|  0.0|(692,[126,127,128...|[1.31249466989448...|[0.93245263743309...|       0.0|
|  0.0|(692,[126,127,128...|[1.54350200272498...|[0.95635347857270...|       0.0|
|  0.0|(692,[151,152,153...|[1.18007363109479...|[0.91373741361628...|       0.0|
|  0.0|(692,[152,153,154...|[1.54350200272498...|[0.95635347857270...|       0.0|
|  0.0|(692,[152,153,154...|[1.54350200272498...|[0.95635347857270...|       0.0|
|  0.0|(692,[153

In [77]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [78]:
acc_eval = MulticlassClassificationEvaluator(metricName='accuracy')

In [79]:
print('DTC ACCURACY:')
acc_eval.evaluate(dtc_preds)

DTC ACCURACY:


1.0

In [80]:
print('RFC ACCURACY:')
acc_eval.evaluate(rfc_preds)

RFC ACCURACY:


1.0

In [82]:
print('GBT ACCURACY:')
acc_eval.evaluate(gbt_preds)

GBT ACCURACY:


1.0

In [84]:
#K-means clustering

In [85]:
from pyspark.sql import SparkSession

In [86]:
spark = SparkSession.builder.appName('cluster').getOrCreate()

In [87]:
from pyspark.ml.clustering import KMeans

In [88]:
dataset = spark.read.format('libsvm').load('sample_kmeans_data.txt')

In [89]:
dataset.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|           (3,[],[])|
|  1.0|(3,[0,1,2],[0.1,0...|
|  2.0|(3,[0,1,2],[0.2,0...|
|  3.0|(3,[0,1,2],[9.0,9...|
|  4.0|(3,[0,1,2],[9.1,9...|
|  5.0|(3,[0,1,2],[9.2,9...|
+-----+--------------------+



In [90]:
final_data=dataset.select('features')

In [92]:
final_data.show()

+--------------------+
|            features|
+--------------------+
|           (3,[],[])|
|(3,[0,1,2],[0.1,0...|
|(3,[0,1,2],[0.2,0...|
|(3,[0,1,2],[9.0,9...|
|(3,[0,1,2],[9.1,9...|
|(3,[0,1,2],[9.2,9...|
+--------------------+



In [93]:
kmeans = KMeans().setK(2).setSeed(1) #Here 2 defines no of clusters and we can give any number u want

In [94]:
model = kmeans.fit(final_data)

In [95]:
wssse = model.computeCost(final_data)

In [97]:
print(wssse)

0.11999999999994547


In [98]:
centers = model.clusterCenters()

In [99]:
centers

[array([ 0.1,  0.1,  0.1]), array([ 9.1,  9.1,  9.1])]

In [102]:
results=model.transform(final_data)

In [103]:
results.show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|           (3,[],[])|         0|
|(3,[0,1,2],[0.1,0...|         0|
|(3,[0,1,2],[0.2,0...|         0|
|(3,[0,1,2],[9.0,9...|         1|
|(3,[0,1,2],[9.1,9...|         1|
|(3,[0,1,2],[9.2,9...|         1|
+--------------------+----------+



In [104]:
#K-means project

In [105]:
from pyspark.sql import SparkSession

In [106]:
spark = SparkSession.builder.appName('cluster').getOrCreate()

In [107]:
dataset = spark.read.csv('hack_data.csv', header=True,inferSchema=True)

In [109]:
dataset.head()

Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)

In [110]:
from pyspark.ml.clustering import KMeans

In [111]:
from pyspark.ml.feature import VectorAssembler

In [112]:
dataset.columns

['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'Location',
 'WPM_Typing_Speed']

In [113]:
feat_cols = ['Session_Connection_Time',
 'Bytes Transferred',
 'Kali_Trace_Used',
 'Servers_Corrupted',
 'Pages_Corrupted',
 'WPM_Typing_Speed']

In [114]:
assembler = VectorAssembler(inputCols=feat_cols,outputCol='features')

In [115]:
final_data = assembler.transform(dataset)

In [117]:
from pyspark.ml.feature import StandardScaler

In [118]:
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures')

In [119]:
scaler_model = scaler.fit(final_data)
cluster_final_data  =scaler_model.transform(final_data)

In [120]:
cluster_final_data.printSchema()

root
 |-- Session_Connection_Time: double (nullable = true)
 |-- Bytes Transferred: double (nullable = true)
 |-- Kali_Trace_Used: integer (nullable = true)
 |-- Servers_Corrupted: double (nullable = true)
 |-- Pages_Corrupted: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- WPM_Typing_Speed: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [121]:
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)

In [122]:
model_k2 = kmeans2.fit(cluster_final_data)
model_k3 = kmeans3.fit(cluster_final_data)

In [124]:
model_k3.transform(cluster_final_data).select('prediction').show()

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [125]:
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   88|
|         2|   79|
|         0|  167|
+----------+-----+



In [126]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+



In [127]:
#Recommender systems

In [128]:
from pyspark.sql import SparkSession

In [129]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [130]:
from pyspark.ml.recommendation import ALS

In [132]:
from pyspark.ml.evaluation import RegressionEvaluator

In [135]:
data = spark.read.csv('movielens_ratings.csv',inferSchema=True,header=True)

In [136]:
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [144]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [145]:
data.groupBy('rating').count().show()

+------+-----+
|rating|count|
+------+-----+
|   1.0|  941|
|   4.0|   99|
|   3.0|  179|
|   2.0|  207|
|   5.0|   75|
+------+-----+



In [146]:
training,test = data.randomSplit([0.8,0.2])

In [147]:
als = ALS(maxIter=5,regParam=0.01,userCol='userId',itemCol='movieId',
              ratingCol='rating')

In [148]:
model = als.fit(training)

In [149]:
predictions = model.transform(test)

In [150]:
predictions.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|     31|   1.0|    26|-0.26297855|
|     31|   1.0|    29|0.102428176|
|     85|   1.0|    28|  0.7843231|
|     85|   1.0|    12|  1.7877965|
|     85|   5.0|    16|  1.3860276|
|     85|   2.0|    20|  1.0848668|
|     85|   1.0|     4|  2.4323997|
|     85|   1.0|     2|  2.9666343|
|     65|   1.0|    28|   4.166763|
|     65|   1.0|    22|  1.0912039|
|     65|   1.0|    19|0.030475974|
|     65|   1.0|     4| 0.90965366|
|     65|   1.0|    24|-0.01591611|
|     53|   1.0|    25|  2.8690052|
|     53|   3.0|    14|  3.7329152|
|     78|   1.0|    27|  0.3808039|
|     78|   1.0|    12|   0.922962|
|     78|   1.0|    24| 0.91781664|
|     78|   1.0|     2| 0.30566475|
|     34|   1.0|    28|  -1.716466|
+-------+------+------+-----------+
only showing top 20 rows



In [151]:
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',
                               predictionCol='prediction')

In [152]:
rmse = evaluator.evaluate(predictions)

In [153]:
print('RMSE')
print(rmse)

RMSE
1.6629900473675454


In [155]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])

In [156]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     10|    11|
|     11|    11|
|     13|    11|
|     25|    11|
|     27|    11|
|     35|    11|
|     37|    11|
|     45|    11|
|     51|    11|
|     64|    11|
|     70|    11|
|     72|    11|
|     79|    11|
|     81|    11|
|     90|    11|
+-------+------+



In [157]:
recommendations =model.transform(single_user)

In [159]:
recommendations.orderBy('prediction',ascending=False).show()

+-------+------+-----------+
|movieId|userId| prediction|
+-------+------+-----------+
|     64|    11|   5.265073|
|     81|    11|  4.6323028|
|     13|    11|   4.444332|
|     27|    11|  4.2127395|
|     90|    11|  3.3112833|
|     11|    11|  3.1497402|
|     35|    11|   2.932283|
|     72|    11|   2.780455|
|     10|    11|  1.8160415|
|     79|    11|   1.693968|
|     45|    11|  1.6160933|
|     51|    11|  1.3429178|
|     70|    11|  1.2711279|
|     37|    11|  1.0005975|
|     25|    11|-0.43453005|
+-------+------+-----------+

