## There are three main component of Spark 

### SparkContext --> communicate with cluster manager  ( as python object ) and create RDD's
### SparkConf --> Define the configuration of spark to run
### SparkShell --> To process and do data analysis 

#### with sparkContext you can pass Jobs, Schedules and Complete Tasks, Resources

In [36]:
from pyspark import SparkContext
#sc = SparkContext("local","hads on Python") # master and appName


[spark_documentation](https://spark.apache.org/docs/2.2.0/api/python/pyspark.html)

In [2]:
# spark in execution 
visitors = [ 10,3,35,25,41,9,29]
df_visitors = sc.parallelize(visitors) # createing RDD
df_visitors_year = df_visitors.map(lambda x:x*365) # Transformation
df_visitors_count = df_visitors_year.collect() #Action
print(df_visitors_count) 

[3650, 1095, 12775, 9125, 14965, 3285, 10585]


In [3]:
# Spak Shell 
# pyspark shell operation df
text_file = spark.read.text("/usr/local/spark/README.md") # copied from 

In [4]:
text_file.count()

105

In [5]:
text_file.first()

Row(value='# Apache Spark')

In [6]:
lines_with_spark = text_file.filter(text_file.value.contains("Spark"))

In [7]:
lines_with_spark.count()

20

### read about SparkConf at it's official site
#### many options like master, appName etc

## UCI Machine Learning Repository has Many DataSets to use for analysis and ML 

[uci_wesite](https://archive.ics.uci.edu/ml/index.php)
#### search for kdd cup 1999 data set at this site
[here is the link](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html)

In [None]:
#link to data http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz

import urllib.request # to fetch data from internet

f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz","kddcup.data.gz")
#download and store file 

In [None]:
ls

In [None]:
#so data is downloaded in current Folder


# SparkRDD


In [8]:
raw_data = sc.textFile("./kddcup.data.gz") #spark can read compress data

In [9]:
raw_data 

./kddcup.data.gz MapPartitionsRDD[19] at textFile at NativeMethodAccessorImpl.java:0

## Parallelization with Spark RDDs

In [10]:
a = list(range(1,101))
lst_rdd = sc.parallelize(a)

In [11]:
lst_rdd

ParallelCollectionRDD[20] at parallelize at PythonRDD.scala:195

In [12]:
lst_rdd.count() #action

100

In [13]:
len(a)

100

In [14]:
lst_rdd.take(10)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [15]:
lst_rdd.reduce(lambda a,b:a+b)

5050

### data in memory (very slow if we work with python object rather then parallize RDDs of pyspark)

In [16]:
data_in_memory = raw_data.takeSample(False,10000,42) #replacement,nums,randomseed

In [17]:
contain_normal_py = [ line.split(",") for line in data_in_memory \
                     if "normal" in line ]


In [18]:
len(contain_normal_py)

1981

## Spark Way

#### Splitting DataSets and Creating New Combinations with Set Operations

In [19]:
sampled = raw_data.sample(False,.10,seed=42)
normal_sample = sampled.filter(lambda line : "normal" in line)

In [20]:
normal_sample.count()

97404

In [21]:
non_normal_sample = sampled.subtract(normal_sample)

In [22]:
sampled.count()

490705

In [23]:
non_normal_sample.count()

393301

In [24]:
normal_sample.count() + non_normal_sample.count()

490705

In [25]:
feature_1 = sampled.map(lambda line : line.split(",")).map(
                                lambda f: f[1]).distinct()

In [26]:
feature_2 = sampled.map(lambda line : line.split(",")).map(
                                lambda f: f[2]).distinct()

In [27]:
f1 = feature_1.collect()
f2 = feature_2.collect()

In [28]:
f1

['tcp', 'udp', 'icmp']

In [29]:
print(f2)

['http', 'finger', 'auth', 'domain_u', 'smtp', 'ftp', 'telnet', 'eco_i', 'ntp_u', 'ecr_i', 'other', 'private', 'pop_3', 'ftp_data', 'daytime', 'remote_job', 'supdup', 'name', 'ssh', 'domain', 'gopher', 'time', 'rje', 'ctf', 'mtp', 'X11', 'urp_i', 'pm_dump', 'IRC', 'exec', 'bgp', 'nnsp', 'iso_tsap', 'http_443', 'login', 'shell', 'printer', 'efs', 'courier', 'uucp', 'kshell', 'klogin', 'whois', 'echo', 'discard', 'systat', 'netstat', 'hostnames', 'csnet_ns', 'pop_2', 'sunrpc', 'uucp_path', 'nntp', 'netbios_ns', 'netbios_ssn', 'netbios_dgm', 'imap4', 'sql_net', 'vmnet', 'link', 'Z39_50', 'ldap', 'urh_i', 'tftp_u', 'red_i', 'tim_i']


## cartesian all possible combinations ( product )

In [30]:
comb = feature_1.cartesian(feature_2).collect()

In [31]:
comb

[('tcp', 'http'),
 ('tcp', 'finger'),
 ('tcp', 'auth'),
 ('tcp', 'domain_u'),
 ('tcp', 'smtp'),
 ('tcp', 'ftp'),
 ('tcp', 'telnet'),
 ('tcp', 'eco_i'),
 ('tcp', 'ntp_u'),
 ('tcp', 'ecr_i'),
 ('tcp', 'other'),
 ('tcp', 'private'),
 ('tcp', 'pop_3'),
 ('tcp', 'ftp_data'),
 ('tcp', 'daytime'),
 ('tcp', 'remote_job'),
 ('tcp', 'supdup'),
 ('tcp', 'name'),
 ('tcp', 'ssh'),
 ('tcp', 'domain'),
 ('tcp', 'gopher'),
 ('tcp', 'time'),
 ('tcp', 'rje'),
 ('tcp', 'ctf'),
 ('tcp', 'mtp'),
 ('tcp', 'X11'),
 ('tcp', 'urp_i'),
 ('tcp', 'pm_dump'),
 ('tcp', 'IRC'),
 ('tcp', 'exec'),
 ('tcp', 'bgp'),
 ('tcp', 'nnsp'),
 ('tcp', 'iso_tsap'),
 ('tcp', 'http_443'),
 ('tcp', 'login'),
 ('tcp', 'shell'),
 ('tcp', 'printer'),
 ('tcp', 'efs'),
 ('tcp', 'courier'),
 ('tcp', 'uucp'),
 ('tcp', 'kshell'),
 ('tcp', 'klogin'),
 ('tcp', 'whois'),
 ('tcp', 'echo'),
 ('tcp', 'discard'),
 ('tcp', 'systat'),
 ('tcp', 'netstat'),
 ('tcp', 'hostnames'),
 ('tcp', 'csnet_ns'),
 ('tcp', 'pop_2'),
 ('tcp', 'sunrpc'),
 ('tcp', 'u

In [32]:
len(comb)

198

## Aggregating and Summarizing Data into Useful Reports

### loading data

In [33]:
raw_data = sc.textFile("./kddcup.data.gz")

### Calculating averages with map and reduce

In [40]:
csv = raw_data.map(lambda x: x.split(","))
normal_data = csv.filter(lambda x : x[41] == "normal.")
duration = normal_data.map(lambda x : int(x[0]))
total_duration = duration.reduce(lambda x,y : x + y)
total_duration


211895753

In [41]:
total_duration/(normal_data.count())

217.82472416710442

## Faster Average Computation with Aggregate

In [50]:
# aggregate can calculate total_duration, count, all in one

duration_count = normal_data.aggregate(
    (0,0),
    lambda db,new_value  : (db[0]+ int(new_value[0]),db[1]+1),
    lambda db1,db2 : (db1[0]+db2[0],db1[1]+db2[1])
)
duration_count[0] / duration_count[1]


217.82472416710442

In [51]:
duration_count

(211895753, 972781)

## Pivot Tabling with Key Value Paired Data Points

##### Pivot Tables are used to Group Certain Values with Certain keys

### Example 

* People having with different favorite fruits
* How many people have "Apple" as their favorite fruit ? 
* Grouping based on : 

    * People -> value
    * Fruit -> key

In [52]:
csv

PythonRDD[64] at RDD at PythonRDD.scala:53

In [55]:
print(csv.first())

['0', 'tcp', 'http', 'SF', '215', '45076', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '0.00', '0.00', '0.00', '0.00', '1.00', '0.00', '0.00', '0', '0', '0.00', '0.00', '0.00', '0.00', '0.00', '0.00', '0.00', '0.00', 'normal.']


In [58]:
kv = csv.map(lambda x : ( x[41], float(x[0]))).reduceByKey(lambda x,y: x+y)
kv_duration = csv.map(lambda x : ( x[41], float(x[0]))).reduceByKey(lambda x,y: x+y)
kv_duration.collect()

[('normal.', 211895753.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 2.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 24257982.0),
 ('ipsweep.', 13049.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 500.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

In [60]:
kv.countByKey()

defaultdict(int,
            {'normal.': 1,
             'buffer_overflow.': 1,
             'loadmodule.': 1,
             'perl.': 1,
             'neptune.': 1,
             'smurf.': 1,
             'guess_passwd.': 1,
             'pod.': 1,
             'teardrop.': 1,
             'portsweep.': 1,
             'ipsweep.': 1,
             'land.': 1,
             'ftp_write.': 1,
             'back.': 1,
             'imap.': 1,
             'satan.': 1,
             'phf.': 1,
             'nmap.': 1,
             'multihop.': 1,
             'warezmaster.': 1,
             'warezclient.': 1,
             'spy.': 1,
             'rootkit.': 1})

In [61]:
kv.countByValue()

defaultdict(int,
            {('normal.', 211895753.0): 1,
             ('buffer_overflow.', 2751.0): 1,
             ('loadmodule.', 326.0): 1,
             ('perl.', 124.0): 1,
             ('neptune.', 2.0): 1,
             ('smurf.', 0.0): 1,
             ('guess_passwd.', 144.0): 1,
             ('pod.', 0.0): 1,
             ('teardrop.', 0.0): 1,
             ('portsweep.', 24257982.0): 1,
             ('ipsweep.', 13049.0): 1,
             ('land.', 0.0): 1,
             ('ftp_write.', 259.0): 1,
             ('back.', 284.0): 1,
             ('imap.', 72.0): 1,
             ('satan.', 500.0): 1,
             ('phf.', 18.0): 1,
             ('nmap.', 0.0): 1,
             ('multihop.', 1288.0): 1,
             ('warezmaster.', 301.0): 1,
             ('warezclient.', 627563.0): 1,
             ('spy.', 636.0): 1,
             ('rootkit.', 1008.0): 1})

### Computing Summary Statistics with MLlib

##### what are summary statistics ? 
##### How do we use MLlib to create summary statistics ? 

#### MLlib

* Machine learning library that comes with Spark
* A recent development and to use Spark's data processing capabilites
* Gives more seamless, deployable solutions 
[Documentation](https://spark.apache.org/docs/2.2.0/api/python/pyspark.mllib.html)

### Powerfull exploratory data analysis with MLlib
###### Regression, another central task in machine learning is all about predicting numbers. In this section, we explore Spark's capabilities to perform regression tasks with models like linear regression and SVMs. 

### Computing summary statistics with MLlib

In [62]:
raw_data = sc.textFile("./kddcup.data.gz")

In [63]:
csv = raw_data.map(lambda x:x.split(','))
duration = csv.map(lambda x: [int(x[0])])

In [64]:
from pyspark.mllib.stat import Statistics
summary = Statistics.colStats(duration)

In [65]:
summary

<pyspark.mllib.stat._statistics.MultivariateStatisticalSummary at 0x7f75ad7f82b0>

In [66]:
summary.mean()

array([48.34243046])

In [67]:
summary.count()

4898431

In [69]:
summary.min()

array([0.])

In [71]:
summary.variance()

array([523206.01584972])

In [73]:
from math import sqrt

sqrt(summary.variance()[0])

723.3298112546713

## Using Pearson and Spearman to Discover Correlations

<p>Pearson's correlation coefficient is the covariance of the two variables divided by the product of their starndard deviations. The form of the definition involves a "product moment", that is, the mean (the first moment about the origin) of the product of the mean-adjusted random variables; hence the modifier product-moment in the name.</p>

In [74]:
#p(x,y) = cov(x,y)/std(x)*std(y)

In [75]:
metrics = csv.map(lambda x:[x[0],x[4],x[5]])
Statistics.corr(metrics,method="spearman")

array([[ 1.        ,  0.00890383,  0.30144701],
       [ 0.00890383,  1.        , -0.19510495],
       [ 0.30144701, -0.19510495,  1.        ]])

In [77]:
Statistics.corr(metrics,method="pearson")

array([[1.00000000e+00, 4.12205545e-02, 2.03915936e-02],
       [4.12205545e-02, 1.00000000e+00, 2.39337570e-04],
       [2.03915936e-02, 2.39337570e-04, 1.00000000e+00]])

#### ML and data science revovles around the understanding of : 
###### Statistics
###### How data behaves ? 
###### How machine learning models are grounded ?  

In [78]:
# PySpark is big Calculator 
# PySpark hides the complexity of the mathematics underneath it

### Testing your hypotheses on large datasets

#### Pearson's Chi Square Test

* How likely are differences observed by chance
* References the chi-squared distributions
* It references to the chi Square distributions
* For more information on Chi Square Distribution - [Click Here](https://en.wikipedia.org/wiki/Chi-squared_distribution)
* This has three variations
* One of these is -- Goodness of fit test
* It is an observed dataset distributed differently than a theoretical dataset

In [79]:
from pyspark.mllib.linalg import Vectors

In [80]:
visitors_freq = Vectors.dense(0.13,0.61,0.8,0.5,0.3)
print(Statistics.chiSqTest(visitors_freq))

Chi squared test summary:
method: pearson
degrees of freedom = 4 
statistic = 0.5852136752136753 
pValue = 0.9646925263439344 
No presumption against null hypothesis: observed follows the same distribution as expected..


### Putting Structure on Your Big Data with SparkSQL
#### Taking away labels means that we are in unsupervised learning territory. Spark has great support for clustering and dimensionality reduction algorithms.
* Manipulating DataFrame with SparkSQL schemas
* Using the Spark DSL to build queries for structured data operations

In [3]:
raw_data = sc.textFile("./kddcup.data.gz")

In [4]:
from pyspark.sql import Row, SQLContext

In [5]:
sql_context = SQLContext(sc)

In [6]:
csv = raw_data.map(lambda l: l.split(","))

In [7]:
rows = csv.map(lambda p: Row(duration=int(p[0]),protocol=p[1],\
               service=p[2]))

In [8]:
df = sql_context.createDataFrame(rows)

In [9]:
df.registerTempTable("rdd")

In [12]:
sql_context.sql("SELECT duration FROM rdd WHERE protocol='tcp' AND duration > 2000").show()

+--------+
|duration|
+--------+
|   10217|
|   11610|
|   13724|
|   10934|
|   12026|
|    5506|
|   12381|
|    9733|
|   17932|
|   40504|
|   11565|
|   12454|
|    9473|
|   12865|
|   11288|
|   10501|
|   14479|
|   10774|
|   10007|
|   12828|
+--------+
only showing top 20 rows



### Using the Spark DSL to build queries for structured data operations 

In [13]:
df.select("duration").filter(df.duration>2000).filter(df.protocol=='tcp').show()

+--------+
|duration|
+--------+
|   10217|
|   11610|
|   13724|
|   10934|
|   12026|
|    5506|
|   12381|
|    9733|
|   17932|
|   40504|
|   11565|
|   12454|
|    9473|
|   12865|
|   11288|
|   10501|
|   14479|
|   10774|
|   10007|
|   12828|
+--------+
only showing top 20 rows

