# Objective: Covering the basics of SPARK and start working with PySpark
### Info Sources:
    - https://www.youtube.com/watch?v=QUiAc3rWtMA&t=1s - 11:30
    - https://www.guru99.com/pyspark-tutorial.html


### Apache Spark:
-  Spark Core is the base of the engine and contains basic functionalities
- Spark works with RDD (Resilient Distributed Datasets). Each dataset is divided in partitions computable in different nodes of a cluster.

#### Spark SQL:
- Provides API's to work with structured data
- Allows the manipulation of data via SQL commands
- Suports various data formats: CSV, JSON, Parquet, Hive, Cassandra...

#### Spark Streaming:
- Enables processing of live streams of data with very low latency
- divides imput data streams into batches
- for example, real time processin of logs of an app server or tweets from twitter

#### Spark MLib:
- Machine Learning and AI built-in libraries
- Data preprocessing + classificaton, regression, cluestring....

#### Spark GraphX:
- Library for Graph manipulation and computations for big data




!['SparkSchema'](Spark_schema.png)


- Spark provides a layered architecture
- All layers and components are loosely couples
- A Driver Program (in a SparkContext) runs on the master node of the Spark Cluster
- The cluster manager allows to retrieve and work with data from different sources (nodes, cloud, etc...)
- Translate the RDD's into the execution graphs, translate the RDD's figures and numbers into graphs.

### The Role of an Executor in spark:
- Every application needs its won executor process
- An executor performs the data processing
- it reads data from and writes data to externar sources
- interacts with storage systems

### Spark Deployment Modes:
- The Standalone Spark works right so we don't need a third party cluster manager.
- Mesos: Can replace the spark cluster manager
- Spark on Hadoop YARN: That can enhance the processing capabilities of spark.
- Amazon EC2(Elastic cloud computing): We can launch a cluster on Amazon EC2 in 5 min and it accelerates the speed of Spark
- Kubernetes

### Running Applications on YARN:
- Spark is preconfigured for YARN
- YARN controls resource management, scheduling and security when we run Spark

### Cluster Deployment Mode:
- Spark runs insida an Application Master managed by YARN
- A single process in a YARN container is responsible for driving the application and requestin resources from YARN
### Client Deployment Mode
- The driver runs on the host where the job is submitted
- To request executor containers from YARN, the ApplicationMaster is used
- The client communicates with those containers to schedule the work once they start.


### Spark Shell
- Provides a simple way to learn the API
- Every SparkContext launches a web UI (User Interface)

In [1]:
import pyspark
from pyspark import SparkContext
sc = SparkContext()

In [2]:
sc

In [3]:
num_list = [1,2,3,4]
nums = sc.parallelize(num_list)

In [4]:
nums

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [5]:
# Transformation with lambda function:
# Map transformation
squared = nums.map(lambda x: x*x).collect()


In [6]:
nums.take(1)

[1]

In [7]:
type(squared)

list

In [8]:
type(nums)

pyspark.rdd.RDD

In [9]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

In [10]:
sqlContext = SQLContext(sc)

In [11]:
# Create a list of tuples with names and ages:

#Create the tuple list
names_ages = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
# Build and RDD out of it
rdd = sc.parallelize(names_ages)
# Convert tuples
people = rdd.map(lambda x: Row(name=x[0], age=x[1]))


In [12]:
# Creating a dataframe context
sqlContext.createDataFrame(people)
DF_people = sqlContext.createDataFrame(people)

In [13]:
# To access the type of each feature (column), use printSchema()
DF_people.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



# Machine Learning with Spark

#### Step 1: Basic operations w/ PySpark

In [14]:
# Importing the toolset:
import pyspark
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkFiles

In [15]:
# Initialize the SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
sc.addFile(url)

In [16]:
# Load the data into a dataframe:

df = sqlContext.read.csv(SparkFiles.get('adult_data.csv'), header=True, inferSchema=True) #inferschema set to true means that spark will try to automatically guess the type of data
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [17]:
df.show(5, truncate= False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

In [18]:
# Had we set the inferSchema kwarg to False, we would need to convert continuous variables into the right type
# Import all from `sql.types`
from pyspark.sql.types import *
df_strings = sqlContext.read.csv(SparkFiles.get('adult_data.csv'), header=True, inferSchema=False)

def convertColumn(df, names, newType): #names is the name of the column we want to change and newType the type we will want to convert into.
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
    return df

continuous_features = ['age', 'fnlwgt', 'capital-gain', 'educational-num', 'capital-gain', 'capital-loss', 'hours-per-week']




In [19]:
df_strings.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [20]:
convertColumn(df_strings, continuous_features, FloatType())

DataFrame[x: string, age: float, workclass: string, fnlwgt: float, education: string, educational-num: float, marital-status: string, occupation: string, relationship: string, race: string, gender: string, capital-gain: float, capital-loss: float, hours-per-week: float, native-country: string, income: string]

In [21]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol='income', outputCol='newIncome')
model = stringIndexer.fit(df)
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [22]:
# Selecting columns
df.select('age', 'fnlwgt').show(1)

+---+------+
|age|fnlwgt|
+---+------+
| 25|226802|
+---+------+
only showing top 1 row



In [23]:
# Counting by group
df.groupBy("education").count().sort("count",ascending=True).show()	

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [24]:
# Describing data
df.describe().show()
df.describe('capital-loss').show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

In [25]:
# Crosstabs
df.crosstab('age', 'income').sort('age_income').show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|        17|  595|   0|
|        18|  862|   0|
|        19| 1050|   3|
|        20| 1112|   1|
|        21| 1090|   6|
|        22| 1161|  17|
|        23| 1307|  22|
|        24| 1162|  44|
|        25| 1119|  76|
|        26| 1068|  85|
|        27| 1117| 115|
|        28| 1101| 179|
|        29| 1025| 198|
|        30| 1031| 247|
|        31| 1050| 275|
|        32|  957| 296|
|        33| 1045| 290|
|        34|  949| 354|
|        35|  997| 340|
|        36|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [26]:
# Dropping columns

# To have a new DF returned
df_2 = df.drop('educational-num')
df_2.printSchema()
# To have a list with the new column names returned
df_3 = df.drop('educational-num').columns
df_3
# To have a new dataframe without the row that have any/all nan values
df_4 = df.dropna(how='any')
df_4

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



DataFrame[x: int, age: int, workclass: string, fnlwgt: int, education: string, educational-num: int, marital-status: string, occupation: string, relationship: string, race: string, gender: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, income: string]

In [27]:
# Describing data by groups
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



#### Step 2: Data Preprocessing

In [28]:
# We will be squaring the age value to counter the fact that very young households and retired ones have a lower level of income and thus the relation between income and age is not linear

# Adding a new column Age squared
from pyspark.sql.functions import *

# 1.- Select the column from the current df, calculate the new one and store it in a variable
#age_square = df.select(col('age')**2)

# 2.- Apply the transformation to the dataframe
df = df.withColumn('age_square', col('Age')**2)


df = df.select(['x',
 'age',
 'age_square',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income'])

df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [29]:
# Excluding countries with a low number of observations- i.e: The Netherlands:
df.groupBy("native-country").count().sort("count",ascending=True).show()

+--------------------+-----+
|      native-country|count|
+--------------------+-----+
|  Holand-Netherlands|    1|
|             Hungary|   19|
|            Honduras|   20|
|            Scotland|   21|
|Outlying-US(Guam-...|   23|
|          Yugoslavia|   23|
|                Laos|   23|
|     Trinadad&Tobago|   27|
|            Cambodia|   28|
|            Thailand|   30|
|                Hong|   30|
|             Ireland|   37|
|              France|   38|
|             Ecuador|   45|
|                Peru|   46|
|              Greece|   49|
|           Nicaragua|   49|
|                Iran|   59|
|              Taiwan|   65|
|            Portugal|   67|
+--------------------+-----+
only showing top 20 rows



In [30]:
df.filter(df['native-country'] == 'Holand-Netherlands').count()
df_removed = df.filter(df['native-country'] != 'Holand-Netherlands')

#### Build a data processing pipeline:

The Pipeline is an API where you push data, various operations are done inside the pipeline and the output is then used to feed an algorithm.
For instance, converting a string variable to OneHotEncoder (simirar to pandas.getdummies())

In [31]:
# StringIndexer + OneHotEncoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol='workclass', outputCol='workclass_encoded')
model = stringIndexer.fit(df)
indexed = model.transform(df) # Up to here, the transformation adds a column where a categorical variable has been turned into numbers indicating each category
encoder = OneHotEncoder(dropLast = False, inputCol='workclass_encoded', outputCol='workclass_vec')
encoded = encoder.transform(indexed) # Here, similarly to what pandas.Get_dummies does, it creates a binary vector where the row will have a value of one in the categorý they belong and of zero in the others
encoded.show(2)

+---+---+----------+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
|  x|age|age_square|workclass|fnlwgt|education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|workclass_encoded|workclass_vec|
+---+---+----------+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
|  1| 25|     625.0|  Private|226802|     11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|              0.0|(9,[0],[1.0])|
|  2| 38|    1444.0|  Private| 89814|  HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband

In [32]:
indexed.show(2)

+---+---+----------+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+
|  x|age|age_square|workclass|fnlwgt|education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|workclass_encoded|
+---+---+----------+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+
|  1| 25|     625.0|  Private|226802|     11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|              0.0|
|  2| 38|    1444.0|  Private| 89814|  HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|           0|           0|            50| 

In [33]:
# Buiding the Pipeline:
# 1.- Encode categorical data:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator

CATEGORICAL_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # Stages in our pipeline
for categoricalCol in CATEGORICAL_FEATURES:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + '_classVec'])
    stages += [stringIndexer, encoder]

In [34]:
# 2.- Index the income feature. Spark accepts not string values for the income
income_stringIdx = StringIndexer(inputCol='income', outputCol='newIncome')
stages += [income_stringIdx]


In [35]:
# 3.- Add continuous variable:
assemblerInputs = [c + '_classVec' for c in CATEGORICAL_FEATURES] + continuous_features

In [36]:
# 4.- Assemble the steps
assembler = VectorAssembler(inputCols = assemblerInputs , outputCol = 'features')
stages += [assembler]

In [37]:
# Creating a Pipeline:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_removed)
model = pipelineModel.transform(df_removed)

In [38]:
model

DataFrame[x: int, age: int, age_square: double, workclass: string, fnlwgt: int, education: string, educational-num: int, marital-status: string, occupation: string, relationship: string, race: string, gender: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, income: string, workclass_index: double, workclass_classVec: vector, education_index: double, education_classVec: vector, marital-status_index: double, marital-status_classVec: vector, occupation_index: double, occupation_classVec: vector, relationship_index: double, relationship_classVec: vector, race_index: double, race_classVec: vector, gender_index: double, gender_classVec: vector, native-country_index: double, native-country_classVec: vector, newIncome: double, features: vector]