In [1]:
import findspark

In [2]:
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [7]:
df = spark.read.json('/home/ubuntu/classfiles/people.json')

# Section 1: Basic DataFrame Manipulation Practice

In [8]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [9]:
df.printSchema()
# learning data shcema

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [10]:
df.columns

['age', 'name']

In [12]:
df.describe().show()
#learning descriptive statistics of the data

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [14]:
 from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [16]:
# Changing data type
data_schema = [StructField('age', IntegerType(),True),
               StructField('name', StringType(),True)
              ]


In [18]:
final_struc = StructType(fields = data_schema)

In [20]:
df = spark.read.json('/home/ubuntu/classfiles/people.json', schema = final_struc)
# importing data with constructed shema

In [22]:
# selecting single column
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [25]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [28]:
df.withColumn('newage',df['age']*3).show()
#creating new columns out of existing columns

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    90|
|  19| Justin|    57|
+----+-------+------+



In [31]:
#naming new data columns
df.withColumnRenamed('age', 'my_new_age').show()

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      null|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [30]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [32]:
# construct df manipulation with SQL
df.createOrReplaceTempView('people')

In [35]:
results = spark.sql("SELECT age, age*2, name FROM people")

In [36]:
results.show()

+----+---------+-------+
| age|(age * 2)|   name|
+----+---------+-------+
|null|     null|Michael|
|  30|       60|   Andy|
|  19|       38| Justin|
+----+---------+-------+



# Section 2: Spark DataFrame Basic Operation

In [38]:
from pyspark.sql import SparkSession

In [39]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [45]:
df = spark.read.csv('/home/ubuntu/classfiles/appl_stock.csv', inferSchema = True, header = True)

In [47]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [55]:
#data filtering

In [64]:
df.filter('Close < 100').select(['open',"close"]).show()

+-----------------+-----------------+
|             open|            close|
+-----------------+-----------------+
|        92.699997|        93.699997|
|        94.730003|            94.25|
|        94.129997|        93.860001|
|        94.040001|        92.290001|
|        92.199997|        91.279999|
|        91.510002|        92.199997|
|        92.309998|92.08000200000001|
|        92.269997|            92.18|
|        92.290001|        91.860001|
|        91.849998|        90.910004|
|            91.32|90.83000200000001|
|            90.75|        90.279999|
|        90.209999|        90.360001|
|        90.370003|        90.900002|
|            90.82|        91.980003|
|        92.099998|            92.93|
|        93.519997|        93.519997|
|        93.870003|        93.480003|
|93.66999799999999|        94.029999|
|        94.139999|        95.970001|
+-----------------+-----------------+
only showing top 20 rows



In [67]:
df.filter(df['Close'] < 100).show()

+--------------------+-----------------+---------+---------+-----------------+---------+-----------------+
|                Date|             Open|     High|      Low|            Close|   Volume|        Adj Close|
+--------------------+-----------------+---------+---------+-----------------+---------+-----------------+
|2014-06-09 00:00:...|        92.699997|93.879997|    91.75|        93.699997| 75415000|        88.906324|
|2014-06-10 00:00:...|        94.730003|95.050003|    93.57|            94.25| 62777000|        89.428189|
|2014-06-11 00:00:...|        94.129997|94.760002|93.470001|        93.860001| 45681000|        89.058142|
|2014-06-12 00:00:...|        94.040001|94.120003|91.900002|        92.290001| 54749000|        87.568463|
|2014-06-13 00:00:...|        92.199997|92.440002|90.879997|        91.279999| 54525000|        86.610132|
|2014-06-16 00:00:...|        91.510002|    92.75|91.449997|        92.199997| 35561000|        87.483064|
|2014-06-17 00:00:...|        92.3099

In [71]:
df.filter( (df['Adj Close'] > 100) & (df['Open'] < 100)).show()

+--------------------+---------+----------+----+----------+---------+----------+
|                Date|     Open|      High| Low|     Close|   Volume| Adj Close|
+--------------------+---------+----------+----+----------+---------+----------+
|2015-08-24 00:00:...|94.870003|108.800003|92.0|103.120003|162206300|100.012029|
+--------------------+---------+----------+----+----------+---------+----------+



In [72]:
result = df.filter( (df['Adj Close'] > 100) & (df['Open'] < 100)).collect()

# Section 3: Groupby and Aggregation

In [1]:
from pyspark.sql import SparkSession

ModuleNotFoundError: No module named 'pyspark'

In [74]:
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [76]:
df = spark.read.csv('/home/ubuntu/classfiles/sales_info.csv', inferSchema = True, header = True)

In [77]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [79]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [80]:
df.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [81]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [84]:
#aggregation
df.select(countDistinct('Sales').alias('distinct_counts')).show()

+---------------+
|distinct_counts|
+---------------+
|             11|
+---------------+



In [86]:
#oder by
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



# Section 4: Handling Missing Data

In [7]:
df2 = spark.read.csv('/home/ubuntu/classfiles/ContainsNull.csv', inferSchema = True, header = True)

In [8]:
df2.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [89]:
#single drop
df2.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [91]:
df2.na.drop(thresh = 2).show()
#drop based on number of missing values

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [92]:
df2.na.drop(how = 'all').show()
#only drop values that has all nulls

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [94]:
# drio only one column has missing
df2.na.drop(subset = ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [98]:
#fillna - string
df2.na.fill('FILL').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| FILL| null|
|emp3| FILL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [99]:
#fillna - number
df2.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [100]:
df.na.fill('NoName', subset = ['Name']).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [108]:
#fill null with mean

In [101]:
from pyspark.sql.functions import mean

In [102]:
mean_val = df.select(mean(df['Sales'])).collect()

In [105]:
mean_sales = mean_val[0][0]

In [107]:
df2.na.fill(mean_sales, ['Sales']).show()

+----+-----+-----------------+
|  Id| Name|            Sales|
+----+-----+-----------------+
|emp1| John|360.5833333333333|
|emp2| null|360.5833333333333|
|emp3| null|            345.0|
|emp4|Cindy|            456.0|
+----+-----+-----------------+



# Section 5.1: Modeling - Trees

# Problem: using Tree Model to predict feature importance on which preservative causes dog food to spoil

In [9]:
spark = SparkSession.builder.appName('RFmodel').getOrCreate()

In [11]:
data = spark.read.csv('/home/ubuntu/classfiles/dog_food.csv', inferSchema=True, header=True)

In [12]:
data.show()

+---+---+----+---+-------+
|  A|  B|   C|  D|Spoiled|
+---+---+----+---+-------+
|  4|  2|12.0|  3|    1.0|
|  5|  6|12.0|  7|    1.0|
|  6|  2|13.0|  6|    1.0|
|  4|  2|12.0|  1|    1.0|
|  4|  2|12.0|  3|    1.0|
| 10|  3|13.0|  9|    1.0|
|  8|  5|14.0|  5|    1.0|
|  5|  8|12.0|  8|    1.0|
|  6|  5|12.0|  9|    1.0|
|  3|  3|12.0|  1|    1.0|
|  9|  8|11.0|  3|    1.0|
|  1| 10|12.0|  3|    1.0|
|  1|  5|13.0| 10|    1.0|
|  2| 10|12.0|  6|    1.0|
|  1| 10|11.0|  4|    1.0|
|  5|  3|12.0|  2|    1.0|
|  4|  9|11.0|  8|    1.0|
|  5|  1|11.0|  1|    1.0|
|  4|  9|12.0| 10|    1.0|
|  5|  8|10.0|  9|    1.0|
+---+---+----+---+-------+
only showing top 20 rows



In [17]:
from pyspark.ml.feature import VectorAssembler

In [18]:
assembler = VectorAssembler(inputCols = ['A','B','C','D'], outputCol= 'features')

In [19]:
output = assembler.transform(data)

In [20]:
from pyspark.ml.classification import RandomForestClassifier

In [21]:
rfc = RandomForestClassifier(labelCol = 'Spoiled', featuresCol='features')

In [22]:
final_data = output.select('features','Spoiled')

In [24]:
final_data.show()

+-------------------+-------+
|           features|Spoiled|
+-------------------+-------+
| [4.0,2.0,12.0,3.0]|    1.0|
| [5.0,6.0,12.0,7.0]|    1.0|
| [6.0,2.0,13.0,6.0]|    1.0|
| [4.0,2.0,12.0,1.0]|    1.0|
| [4.0,2.0,12.0,3.0]|    1.0|
|[10.0,3.0,13.0,9.0]|    1.0|
| [8.0,5.0,14.0,5.0]|    1.0|
| [5.0,8.0,12.0,8.0]|    1.0|
| [6.0,5.0,12.0,9.0]|    1.0|
| [3.0,3.0,12.0,1.0]|    1.0|
| [9.0,8.0,11.0,3.0]|    1.0|
|[1.0,10.0,12.0,3.0]|    1.0|
|[1.0,5.0,13.0,10.0]|    1.0|
|[2.0,10.0,12.0,6.0]|    1.0|
|[1.0,10.0,11.0,4.0]|    1.0|
| [5.0,3.0,12.0,2.0]|    1.0|
| [4.0,9.0,11.0,8.0]|    1.0|
| [5.0,1.0,11.0,1.0]|    1.0|
|[4.0,9.0,12.0,10.0]|    1.0|
| [5.0,8.0,10.0,9.0]|    1.0|
+-------------------+-------+
only showing top 20 rows



In [26]:
rfc_model = rfc.fit(final_data)

In [28]:
final_data.head(1)

[Row(features=DenseVector([4.0, 2.0, 12.0, 3.0]), Spoiled=1.0)]

In [29]:
rfc_model.featureImportances

SparseVector(4, {0: 0.024, 1: 0.0191, 2: 0.9367, 3: 0.0202})

# Section 5.2: Modeling - Clustering

# Clustering Consulting Project - Solutions

A large technology firm needs your help, they've been hacked! Luckily their forensic engineers have grabbed valuable data about the hacks, including information like session time,locations, wpm typing speed, etc. The forensic engineer relates to you what she has been able to figure out so far, she has been able to grab meta data of each session that the hackers used to connect to their servers. These are the features of the data:

* 'Session_Connection_Time': How long the session lasted in minutes
* 'Bytes Transferred': Number of MB transferred during session
* 'Kali_Trace_Used': Indicates if the hacker was using Kali Linux
* 'Servers_Corrupted': Number of server corrupted during the attack
* 'Pages_Corrupted': Number of pages illegally accessed
* 'Location': Location attack came from (Probably useless because the hackers used VPNs)
* 'WPM_Typing_Speed': Their estimated typing speed based on session logs.


The technology firm has 3 potential hackers that perpetrated the attack. Their certain of the first two hackers but they aren't very sure if the third hacker was involved or not. They have requested your help! Can you help figure out whether or not the third suspect had anything to do with the attacks, or was it just two hackers? It's probably not possible to know for sure, but maybe what you've just learned about Clustering can help!

**One last key fact, the forensic engineer knows that the hackers trade off attacks. Meaning they should each have roughly the same amount of attacks. For example if there were 100 total attacks, then in a 2 hacker situation each should have about 50 hacks, in a three hacker situation each would have about 33 hacks. The engineer believes this is the key element to solving this, but doesn't know how to distinguish this unlabeled data into groups of hackers.**

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('hack_find').getOrCreate()
# Start Spark Session

In [6]:
from pyspark.ml.clustering import KMeans
# Loads data.
dataset = spark.read.csv("/home/ubuntu/classfiles/hack_data.csv",header=True,inferSchema=True)

In [7]:
dataset.head()
#Dataset is the same as described in the schema

Row(Session_Connection_Time=8.0, Bytes Transferred=391.09, Kali_Trace_Used=1, Servers_Corrupted=2.96, Pages_Corrupted=7.0, Location='Slovenia', WPM_Typing_Speed=72.37)

In [8]:
dataset.describe().show()

+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|summary|Session_Connection_Time| Bytes Transferred|   Kali_Trace_Used|Servers_Corrupted|   Pages_Corrupted|   Location|  WPM_Typing_Speed|
+-------+-----------------------+------------------+------------------+-----------------+------------------+-----------+------------------+
|  count|                    334|               334|               334|              334|               334|        334|               334|
|   mean|     30.008982035928145| 607.2452694610777|0.5119760479041916|5.258502994011977|10.838323353293413|       null|57.342395209580864|
| stddev|     14.088200614636158|286.33593163576757|0.5006065264451406| 2.30190693339697|  3.06352633036022|       null| 13.41106336843464|
|    min|                    1.0|              10.0|                 0|              1.0|               6.0|Afghanistan|              40.0|
|    max|           

In [9]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [10]:
feat_cols = ['Session_Connection_Time', 'Bytes Transferred', 'Kali_Trace_Used',
             'Servers_Corrupted', 'Pages_Corrupted','WPM_Typing_Speed']

In [11]:
vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')

In [12]:
final_data = vec_assembler.transform(dataset)

In [15]:
from pyspark.ml.feature import StandardScaler
# Kmeans is very sensitive to data scale

In [16]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

In [17]:
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)

In [18]:
# Normalize each feature to have unit standard deviation.
cluster_final_data = scalerModel.transform(final_data)

#### trying to find out how many clusters there are:

In [28]:
kmeans4 = KMeans(featuresCol='scaledFeatures',k=4)
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)

In [29]:
model_k4 = kmeans4.fit(cluster_final_data)
model_k3 = kmeans3.fit(cluster_final_data)
model_k2 = kmeans2.fit(cluster_final_data)

In [30]:
wssse_k4 = model_k4.computeCost(cluster_final_data)
wssse_k3 = model_k3.computeCost(cluster_final_data)
wssse_k2 = model_k2.computeCost(cluster_final_data)

In [31]:
print("With K=4")
print("Within Set Sum of Squared Errors = " + str(wssse_k4))
print("With K=3")
print("Within Set Sum of Squared Errors = " + str(wssse_k3))
print('--'*30)
print("With K=2")
print("Within Set Sum of Squared Errors = " + str(wssse_k2))


With K=4
Within Set Sum of Squared Errors = 267.1336116887891
With K=3
Within Set Sum of Squared Errors = 434.1492898715845
------------------------------------------------------------
With K=2
Within Set Sum of Squared Errors = 601.7707512676716


#### Not much to be gained from the WSSSE, after all, we would expect that as K increases, the WSSSE decreases. We could however continue the analysis by seeing the drop from K=3 to K=4 to check if the clustering favors even or odd numbers. This won't be substantial, but its worth a look:

In [24]:
for k in range(2,9):
    kmeans = KMeans(featuresCol='scaledFeatures',k=k)
    model = kmeans.fit(cluster_final_data)
    wssse = model.computeCost(cluster_final_data)
    print("With K={}".format(k))
    print("Within Set Sum of Squared Errors = " + str(wssse))
    print('--'*30)

With K=2
Within Set Sum of Squared Errors = 601.7707512676716
------------------------------------------------------------
With K=3
Within Set Sum of Squared Errors = 434.1492898715845
------------------------------------------------------------
With K=4
Within Set Sum of Squared Errors = 267.1336116887891
------------------------------------------------------------
With K=5
Within Set Sum of Squared Errors = 252.33878938818208
------------------------------------------------------------
With K=6
Within Set Sum of Squared Errors = 234.08980700973405
------------------------------------------------------------
With K=7
Within Set Sum of Squared Errors = 205.5224625302034
------------------------------------------------------------
With K=8
Within Set Sum of Squared Errors = 190.92902865766914
------------------------------------------------------------


#### ** Nothing definitive can be said with the above, but wait! The last key fact that the engineer mentioned was that the attacks should be evenly numbered between the hackers! Let's check with the transform and prediction columns that result form this! **

In [25]:
model_k3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   84|
|         2|   83|
|         0|  167|
+----------+-----+



In [26]:
model_k2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  167|
|         0|  167|
+----------+-----+



# In conclusion, with given challenge, it is obvious that there are 2 clusters, meaning there are 2 hackers