<center><h1>Introduction to the Cloud and Apache Spark</h1></center>

<h2>Overview</h2>

<ul>
    <li>Introduction to the Cloud</li>
    <li>Context</li>
<li>Basics of Apache Spark</li>
    <li>Run Apache Spark on the Cloud</li>
</ul>

<h2>Why would you go with the Cloud?</h2>

<img src="http://stat.cmu.edu/~mfarag/652/lectures/l2/cloud_characteristics.png" height="250px"/>

<h2>What is Cloud in the REAL World?</h2>

<ul>
<li><b>“Cloud”</b> refers to large Internet services running on 10,000s of machines (Amazon, Google, Microsoft, etc) </li>

<li><b>“Cloud computing”</b> refers to services by these companies that let external customers rent cycles and storage

<ul>
    <li>Amazon EC2: virtual machines at 8.5¢/hour (approximated cost)</li>
    <li>Amazon S3: storage at 21¢/GB/month (approximated cost)</li>
    <li>Google Cloud AppEngine</li>
    <li>Windows Azure</li>
    </ul>
</li>
    </ul>
 

<h2>Connect the Dots!</h2>

<img src="http://www.andrew.cmu.edu/~mfarag/static/mlmodeling.png" height="250px"/>

<h2>Context</h2>

Last lecture, we talked about the machine learning modeling process. Now, let's switch gears and discuss our technical infrastructure that can be deployed later to the cloud. You will need Spark installed on your machine in order to run the code snippets in this lecture.

In this class, we will use <b>Apache Spark</b> for data preparation, cleaning, and feature engineering. But why Spark?

<h2>Why Spark?</h2>

<ul>
<li>Developed in 2009 at UC Berkeley AMPLab, then open sourced in 2010, </li>
<li>Spark is the next revolution of the popular Hadoop MapReduce framework.</li>

<li>Gartner, Advanced Analytics and Data Science (2014) "Organizations that are looking at big data challenges – including collection, ETL, storage, exploration and analytics – should consider Spark for its in-memory performance and the breadth of its model. It supports advanced analytics solutions on Hadoop clusters, including the iterative model required for machine learning and graph analysis."</li>

<h2>Pandas vs PySpark (Spark developed in Python)</h2>

In a research done by Databricks, an industrial leader in the domain of big data storage and processing, PySpark shows superior performance compared to traditional implementations.
<center><figure><img src="http://stat.cmu.edu/~mfarag/14810/l6/pandas_vs_pyspark.png"/></figure></center>


<h2>What is Spark?</h2>

<ul>
<li>Apache Spark is a fast and general-purpose cluster computing system for large scale data processing.</li>
<li>Spark was originally written in Scala, which allows concise function syntax and interactive use.</li>
    <li>Apache Spark provides High-level APIs in Java, Scala, Python (PySpark) and R.</li>    
    <li>Apache Spark combines two different modes of processing:<ul>
        <li><b>Batch-based Processing</b> which can be provided via Apache Hadoop MapReduce</li>
        <li><b>Real-time Processing</b> which can be provided via Apache Storm.</li>
        </ul>
        </li>
    </ul>

<center><figure><img src="http://stat.cmu.edu/~mfarag/14810/l3/batch_vs_realtime.png"/></figure></center>


<h2>Spark Ecosytem</h2>

<center><figure><img src="http://stat.cmu.edu/~mfarag/14810/l3/spark_ecosystem.png"/></figure></center>

<h2>Spark Componenets</h2>

<center><figure><img src="http://stat.cmu.edu/~mfarag/14810/l3/spark_components.png"/></figure></center>

<h3>Spark Core</h3>

Spark Core is the general execution engine for the Spark platform that other functionalities are built on top of it. 
Spark has several advantages: 
<ul>
<li>Speed: runs programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk</li>
    <li>Ease of Use: Write applications quickly in Java, Scala, Python, R</li>
<li>Generality: Combine SQL, streaming, and complex analytics</li>
<li>Runs Everywhere: Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3</li>
</ul>

In [1]:
# if you installed Spark on windows, 
# you may need findspark and need to initialize it prior to being able to use pyspark

!pip install findspark



In [2]:
# Uncomment the following lines if you are using Windows!
import findspark
findspark.init()
findspark.find()
# The above lines are used mostly in Windows only and you don't need them on other platforms.
import pyspark
from pyspark.sql import SparkSession


spark = SparkSession.builder.master("local[*]").appName('SparkTest').getOrCreate()

ValueError: Couldn't find Spark, make sure SPARK_HOME env is set or Spark is in an expected location (e.g. from homebrew installation).

<h2>Download Data Files Remotely</h2>

In [None]:
!pip install wget

In [5]:
# Downloading and preprocessing KDD Train Data 
!python -m wget https://www.andrew.cmu.edu/user/mfarag/763/KDDTrain+.txt


Saved under KDDTrain+.txt


<h2>Cloud Consideration</h2>
Your data need to be moved to a special storage server if you are running Spark on the Cloud. This special storage is called HDFS. The following command is used to move your data from your local storage to the special storage server.

In [None]:
# Uncomment and Execute this line if you are running your notebook on the Cloud
#!hadoop fs -put KDDTrain+.txt /

<h2>Spark Dataframes</h2>

Inspired by pandas DataFrames in structure, format, and a few specific operations, Spark DataFrames are like distributed in-memory tables with named columns and schemas, where each column has a specific data type: integer, string, array, map, real, date, timestamp, etc. To a human’s eye, a Spark DataFrame is like a table

When data are visualized as a structured table, it’s not only easy to digest but also easy to work with when it comes to common operations you might want to execute on rows and columns. <br/>Also, DataFrames are immutable and Spark keeps a lineage of all transformations. You can add or change the names and data types of the columns, creating new DataFrames while the previous versions are preserved. A named column in a DataFrame and its associated Spark data type can be declared in the schema.

Let's examine the generic and structured data types available in Spark before we use them to define a schema. Then we'll illustrate how to create a DataFrame with a schema.

<h2><a href="https://spark.apache.org/docs/latest/sql-ref-datatypes.html">Spark’s Basic Data Types</a></h2>

Spark supports basic internal data types. These data types can be declared in your Spark application or defined in your schema

<center><figure><img src="http://stat.cmu.edu/~mfarag/14810/l5/spark_data_types.png"/></figure></center>


In [2]:
# Load data from csv to a dataframe on a local machine. 
# header=False means the first row is not a header 
# sep=',' means the column are seperated using ','
df = spark.read.csv('KDDTrain+.txt', header=False, sep=",")
# on the Cloud, the files will have to be at the root level. So, the cloud version is:
#df = spark.read.csv('/KDDTrain+.txt', header=False, sep=",")

df.show(5, vertical=True)

-RECORD 0--------
 _c0  | 0        
 _c1  | tcp      
 _c2  | ftp_data 
 _c3  | SF       
 _c4  | 491      
 _c5  | 0        
 _c6  | 0        
 _c7  | 0        
 _c8  | 0        
 _c9  | 0        
 _c10 | 0        
 _c11 | 0        
 _c12 | 0        
 _c13 | 0        
 _c14 | 0        
 _c15 | 0        
 _c16 | 0        
 _c17 | 0        
 _c18 | 0        
 _c19 | 0        
 _c20 | 0        
 _c21 | 0        
 _c22 | 2        
 _c23 | 2        
 _c24 | 0.00     
 _c25 | 0.00     
 _c26 | 0.00     
 _c27 | 0.00     
 _c28 | 1.00     
 _c29 | 0.00     
 _c30 | 0.00     
 _c31 | 150      
 _c32 | 25       
 _c33 | 0.17     
 _c34 | 0.03     
 _c35 | 0.17     
 _c36 | 0.00     
 _c37 | 0.00     
 _c38 | 0.00     
 _c39 | 0.05     
 _c40 | 0.00     
 _c41 | normal   
 _c42 | 20       
-RECORD 1--------
 _c0  | 0        
 _c1  | udp      
 _c2  | other    
 _c3  | SF       
 _c4  | 146      
 _c5  | 0        
 _c6  | 0        
 _c7  | 0        
 _c8  | 0        
 _c9  | 0        
 _c10 | 0 

<h2>Avoiding Auto-assigned Column Names: Read CSV and Specify the Column Names</h2>

In [6]:
col_names = ["duration","protocol_type","service","flag","src_bytes",
    "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
    "logged_in","num_compromised","root_shell","su_attempted","num_root",
    "num_file_creations","num_shells","num_access_files","num_outbound_cmds",
    "is_host_login","is_guest_login","count","srv_count","serror_rate",
    "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
    "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
    "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
    "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
    "dst_host_rerror_rate","dst_host_srv_rerror_rate","classes","difficulty_level"]

df = spark.read.csv("KDDTrain+.txt",header=False, inferSchema= True).toDF(*col_names)

# on the Cloud, the files will have to be at the root level. So, the cloud version is:
# df = spark.read.csv("/KDDTrain+.txt",header=False, inferSchema= True).toDF(*col_names)

df.show(1, vertical=True)

-RECORD 0-------------------------------
 duration                    | 0        
 protocol_type               | tcp      
 service                     | ftp_data 
 flag                        | SF       
 src_bytes                   | 491      
 dst_bytes                   | 0        
 land                        | 0        
 wrong_fragment              | 0        
 urgent                      | 0        
 hot                         | 0        
 num_failed_logins           | 0        
 logged_in                   | 0        
 num_compromised             | 0        
 root_shell                  | 0        
 su_attempted                | 0        
 num_root                    | 0        
 num_file_creations          | 0        
 num_shells                  | 0        
 num_access_files            | 0        
 num_outbound_cmds           | 0        
 is_host_login               | 0        
 is_guest_login              | 0        
 count                       | 2        
 srv_count      

<h2>More ways to Display Dataframes</h2>


1.   `df.take(5)` will return a list of five Row objects. 
2.   `df.collect()` will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node. 
3.   `df.show()` is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, `df.show(5, False)` or ` df.show(5, truncate=False)` will show the entire data wihtout any truncation.
4.   `df.limit(5)` will **return a new DataFrame** by taking the first n rows. As spark is distributed in nature, there is no guarantee that `df.limit()` will give you the same results each time.

<h2>Schemas and Creating DataFrames</h2>

You can think about the dataframe as a table. A schema in Spark defines the column names and associated data types for a DataFrame. <b>Most often, schemas come into play when you are reading structured data from an external data source.</b> Defining a schema up front as opposed to taking a schema-on-read approach offers three benefits:

<ul>
<li>You relieve Spark from the onus of inferring data types.</li>
<li>You prevent Spark from creating a separate job just to read a large portion of your file to assert the schema, which for a large data file can be expensive and time-consuming.</li>

<li>You can detect errors early if data don't match the schema.</li>
</ul>
We will explore the creation of schemas and we want you to leverage them. That said, we will automatically infer the schemas when running the code in the lecture for simplicity.

You may also create your own schema using data field name followed by data type.

In [2]:
schema = "id INT, firstName STRING, WEBSITE STRING"
data = [[1, "John", "https://tinyurl.1"],
       [2, "Brooke", "https://tinyurl.2"]]

test_df = spark.createDataFrame(data, schema)
test_df.show()
test_df.printSchema()

+---+---------+-----------------+
| id|firstName|          WEBSITE|
+---+---------+-----------------+
|  1|     John|https://tinyurl.1|
|  2|   Brooke|https://tinyurl.2|
+---+---------+-----------------+

root
 |-- id: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- WEBSITE: string (nullable = true)



<h2>Display Schema Information for Your Dataframe</h2>

In [7]:
df.printSchema() # or df.dtypes

root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true

<h2>Print Column Names in Your Dataframe</h2>

In [5]:
print(df.columns)

['duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes', 'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'classes', 'difficulty_level']


<h2>Print Total Number of Your Records in Your Dataframe</h2>

In [7]:
print(df.count())

125973


<h2>Print Sample Record from Your Dataframe</h2>

In [8]:
df.show(1, vertical=True)

-RECORD 0-------------------------------
 duration                    | 0        
 protocol_type               | tcp      
 service                     | ftp_data 
 flag                        | SF       
 src_bytes                   | 491      
 dst_bytes                   | 0        
 land                        | 0        
 wrong_fragment              | 0        
 urgent                      | 0        
 hot                         | 0        
 num_failed_logins           | 0        
 logged_in                   | 0        
 num_compromised             | 0        
 root_shell                  | 0        
 su_attempted                | 0        
 num_root                    | 0        
 num_file_creations          | 0        
 num_shells                  | 0        
 num_access_files            | 0        
 num_outbound_cmds           | 0        
 is_host_login               | 0        
 is_guest_login              | 0        
 count                       | 2        
 srv_count      

<h2>DataFrame Operations on Columns</h2>

1. Selecting Columns & Creating Subset Dataframes
2. Adding New Columns
3. Renaming Columns
4. Removing Columns

<h2>Create a Subset Dataframe from Your Dataframe</h2>

In [9]:
small_df = df.select("duration","protocol_type","service","classes","difficulty_level")
small_df.show(5)


+--------+-------------+--------+-------+----------------+
|duration|protocol_type| service|classes|difficulty_level|
+--------+-------------+--------+-------+----------------+
|       0|          tcp|ftp_data| normal|              20|
|       0|          udp|   other| normal|              15|
|       0|          tcp| private|neptune|              19|
|       0|          tcp|    http| normal|              21|
|       0|          tcp|    http| normal|              21|
+--------+-------------+--------+-------+----------------+
only showing top 5 rows



<h2>Display Summary Statistics in Your Dataframe</h2>

In [10]:
df.describe().show(vertical=True)

-RECORD 0-------------------------------------------
 summary                     | count                
 duration                    | 125973               
 protocol_type               | 125973               
 service                     | 125973               
 flag                        | 125973               
 src_bytes                   | 125973               
 dst_bytes                   | 125973               
 land                        | 125973               
 wrong_fragment              | 125973               
 urgent                      | 125973               
 hot                         | 125973               
 num_failed_logins           | 125973               
 logged_in                   | 125973               
 num_compromised             | 125973               
 root_shell                  | 125973               
 su_attempted                | 125973               
 num_root                    | 125973               
 num_file_creations          | 125973         

<h2>Display Unique Values from a Column in Your Dataframe</h2>

In [11]:
df.select("classes").distinct().show(40)

+---------------+
|        classes|
+---------------+
|        neptune|
|          satan|
|           nmap|
|      portsweep|
|           back|
|    warezclient|
|   guess_passwd|
|         normal|
|        rootkit|
|           perl|
|buffer_overflow|
|       multihop|
|        ipsweep|
|    warezmaster|
|           imap|
|       teardrop|
|            spy|
|           land|
|            pod|
|      ftp_write|
|          smurf|
|     loadmodule|
|            phf|
+---------------+



<h2>Add a Column to Your Dataframe</h2>

In [12]:
# We will add a new column called 'first_column' at the end
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1)) 
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(1,vertical=True)

-RECORD 0-------------------------------
 duration                    | 0        
 protocol_type               | tcp      
 service                     | ftp_data 
 flag                        | SF       
 src_bytes                   | 491      
 dst_bytes                   | 0        
 land                        | 0        
 wrong_fragment              | 0        
 urgent                      | 0        
 hot                         | 0        
 num_failed_logins           | 0        
 logged_in                   | 0        
 num_compromised             | 0        
 root_shell                  | 0        
 su_attempted                | 0        
 num_root                    | 0        
 num_file_creations          | 0        
 num_shells                  | 0        
 num_access_files            | 0        
 num_outbound_cmds           | 0        
 is_host_login               | 0        
 is_guest_login              | 0        
 count                       | 2        
 srv_count      

<h2>Renaming a Column in Your Dataframe</h2>

In [13]:
df = df.withColumnRenamed('first_column', 'new_column_one')

df.printSchema()


root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true

<h2>Delete a Column from Your Dataframe</h2>

In [None]:
df = df.drop('new_column_one')
df.printSchema()

<h2>Lab: Run Spark on the Cloud</h2>

<h2>Create Clusters (e.g. Hadoop Clusters)</h2>

<li>A cluster is group of machines, servers, or nodes. It helps providing the sum of the computational power offered by all incorporated machines. It's difficult to build a local machine with 64GB RAM and 20TB of Storage but that is not difficult when you are running on the cloud.</li> 
<li>You may start by navigating to Dataproc and click on the Clusters section 
<img src="http://stat.cmu.edu/~mfarag/652/lectures/l4/create_your_cluster.png"/>
</li>

<h2>Cluster - Setup</h2>

<li>Next, you need to create your cluster and choose the cluster to use "Google Compute Engine"</li>

<img src="http://stat.cmu.edu/~mfarag/652/lectures/l4/create_cluster.png"/>

<h3>Cluster Configuration</h3>

<b>Make sure to follow the cluster creation guide posted on Canvas</b>

<h3>Running Cluster</h3>

Once you click on the create button, Google Cloud will work on creating your own cluster and if it's successful, you will see your cluster running.
<img src="http://stat.cmu.edu/~mfarag/652/lectures/l4/running_cluster.png"/>

<h3>Connect to Your Cluster</h3>

From the Web Interfaces, open Jupyter. Upload your Notebook to `GCS` folder and run the cells.