#### Cheat Sheet (pyspark)
https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf

## Big Data

#### The three Vs

##### VOLUME¶
Volume refers to the amount of data generated through websites, portals and online applications in a data-driven business. Especially for online retailers, volume encompasses the available data that are out there and need to be assessed for relevance.

##### VELOCITY¶
Velocity refers to the speed with which data is generated, and as internet speeds have increased and the number of users has increased, the velocity has also increased substantially.

##### VARIETY¶
Variety in Big Data refers to all the structured and unstructured data that has the possibility of getting generated either by humans or by machines. Structured data is whatever data you could store in a spreadsheet. It can easily be cataloged and summary statistics can be calculated for it. Unstructured data are raw things like texts, tweets, pictures, videos, emails, voice mails, hand-written text, ECG reading, and audio recordings. Humans can only make sense of data that is structured, and it is usually up to data scientists to create some organization and structure to unstructured data.

## Parallel and Distributed Computing with Map-Reduce

MapReduce is a programming paradigm that enables the ability to scale across hundreds or thousands of servers for big data analytics. The underlying concept can be somewhat difficult to grasp, because this paradigm differs from the traditional programming practices. This lesson aims to present a simple yet intuitive account of MapReduce that we shall put into practice in upcoming labs. 

*In a nutshell, the term "MapReduce" refers to two distinct tasks. The first is the __Map__ job, which takes one set of data and transforms it into another set of data, where individual elements are broken down into tuples __(key/value pairs)__, while the __Reduce__ job takes the output from a map as input and combines those data tuples into a smaller set of tuples.*

#### Distributed Processing Systems
>A distributed processing system is a group of computers in a network working in tandem to accomplish a task

#### Parallel Processing
With parallel computing:

* a larger problem is broken up into smaller pieces
* every part of the problem follows a series of instructions
* each one of the instructions is executed simultaneously on different processors
* all of the answers are collected from the small problems and combined into one final answer

#### MapReduce process

##### 1. MAP Task ((Splitting & Mapping)
The dataset that needs processing must first be transformed into <key:value> pairs and split into fragments, which are then assigned to map tasks. Each computing cluster is assigned a number of map tasks, which are subsequently distributed among its nodes. In this example, let's assume that we are using 5 nodes (a server with 5 different worker.

First, split the data from one file or files into however many nodes are being used.

We will then use the map function to create key value pairs represented by:   
*{animal}* , *{# of animals per zoo}* 

After processing of the original key:value pairs, some __intermediate__ key:value pairs are generated. The intermediate key:value pairs are __sorted by their key values__ to create a new list of key:value pairs.

##### 2. Shuffling
This list from the map task is divided into a new set of fragments that sorts and shuffles the mapped objects into an order or grouping that will make it easier to reduce them. __The number these new fragments, will be the same as the number of the reduce tasks__. 

##### 3. REDUCE Task (Reducing)
Now, every properly shuffled segment will have a reduce task applied to it. After the task is completed, the final output is written onto a file system. The underlying file system is usually HDFS (Hadoop Distributed File System). 

It's important to note that MapReduce will generally only be powerful when dealing with large amounts of data. When using on a small dataset, it will be faster to perform operations not in the MapReduce framework.

There are two groups of entities in this process to ensuring that the map reduce task gets done properly:

__Job Tracker__: a "master" node that informs the other nodes which map and reduce jobs to complete

__Task Tracker__: the "worker" nodes that complete the map and reduce operations

There are different names for these components depending on the technology used, but there will always be a master node that informs worker nodes what tasks to perform.

## Spark Context

#### SparkConf
- configures a Spark Context including Java properties

        pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)

#### Create a local spark context with pyspark
    import pyspark
    sc = pyspark.SparkContext('local[*]')

#### Display the type of the Spark Context
    type(sc)

#### Use Python's dir(obj) to get a list of all attributes of SparkContext
    dir(sc)

#### Use Python's help ( help(object) ) function to get information on attributes and methods for sc object. 
    help(sc)

#### Check the number of cores being used
    print ("Default number of cores being used:", sc.defaultParallelism) 

#### Check for the current version of Spark
    print ("Current version of Spark:", sc.version)
    
#### Check the name of application currently running in spark environment
    sc.appName
    
#### Access complete configuration settings (including all defaults) for the current spark context 
    sc._conf.getAll()
    
#### Shut down SparkContext
    sc.stop()

## Spark DataFrame
- The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDD
- To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.

        # Import SparkSession from pyspark.sql
        from pyspark.sql import SparkSession

        # Create my_spark
        my_spark = SparkSession.builder.getOrCreate()

        # Print my_spark
        print(my_spark)
#### catalog
- lists all data inside your cluster

        print(spark.catalog.listTables())
        
#### queries
- SQL query format

        # create a SQL query
        query = "FROM flights SELECT * LIMIT 10"
        # Get the first 10 rows of flights
        flights10 = spark.sql(query)
        # Show the results
        flights10.show()
#### Spark Cluster to Pandas DataFrame
- convert with the .toPandas() method

        # create a query
        query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"
        # Run the query
        flight_counts = spark.sql(query)
        # Convert the results to a pandas DataFrame
        pd_counts = flight_counts.toPandas()
        # Print the head of pd_counts
        print(pd_counts.head())
#### DataFrame to Spark Cluster
- the .createDataFrame() method will convert a DF to a Spark Cluster
        
        spark_temp = spark.createDataFrame(pd_temp)
#### Temporary View
- will allow spark to create a temporary cluster to use spark commands on a DataFrame

        spark_temp.createOrReplaceTempView('new_table_name')
#### Direct read-in of datasources
- allows direct conversion to a spark dataframe for several file types

        #  file path to .csv
        file_path = "/usr/local/share/datasets/airports.csv"

        # Read in the airports data
        airports = spark.read.csv(file_path, header=True)

        # Show the data
        airports.show()
#### creating columns
- In Spark you can do this using the .withColumn() method, which takes two arguments. First, a string with the name of your new column, and second the new column input
- Spark dataframes are immutable, so adding new columns means reassigning the df.

        df = df.withColumn("newCol", df.column.mean()) #using the mean of another column to create values for new column 
#### filtering results
- similar to the WHERE clause in SQL
- two ways of filtering:
    - passing a string value will filter out the values listed
    - passing in the direct df.column with the filter will create a new column will boolean values.
    
            # Filter flights by passing a string
            long_flights1 = flights.filter("distance > 1000")

            # Filter flights by passing a column of boolean values
            long_flights2 = flights.filter(flights.distance > 1000)
#### select columns
- similar to SQL SELECT statement
- takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it
- can also use column-wise operations 

        # Select columns using column strings
        selected1 = flights.select('tailnum', 'origin', 'dest')

        # Select columns using df.column format
        temp = flights.select(flights.origin, flights.dest, flights.carrier)
        #using operations to create a column
        avg_speed = (flights.distance/(flights.air_time/60))
        
#### alias
- renames a column when selecting

        #rename column to avg_speed
        avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")
#### aggregating
- you can aggregate with common methods by using the .groupBy() method
- creating groups using the .groupby() allows them to be part of a pyspark.sql.GroupedData class
        
        #using filter and groupby to fingd the shortest flight from PDX
        flights.filter(flights.origin == 'PDX').groupBy().min("distance").show()
- you can also use the .agg() function which allows use of any of the pyspark.sql.functions library

        #import functions
        import pyspark.sql.functions as F
        #find standard deviation of a columns
        df.column.agg(F.stddev()).show()
#### joining
- performed using the df.join() method
    
        joined_df = df.join(joining_df, on='joining_column', how='how_to_join')

## Resilient Distributed Datasets (RDDs)

Resilient Distributed Datasets (RDD) are fundamental data structures of Spark. An RDD is, essentially, the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it. An RDD could come from any datasource, e.g. text files, a database, a JSON file etc.

#### create RDD
    rdd = sc.parallelize(data,numSlices=10) #creates 10 partitions
    print(type(rdd))
    
#### Get # of partitions
    rdd.getNumPartitions()
    
#### Basic actions
    rdd.count() #returns the total count of items in the RDD
    rdd.first() #returns the first item in the RDD
    rdd.take() #returns the first n items in the RDD
    rdd.top() #returns the top n items
    rdd.collect() #returns everything from your RDD
    
#### Mapping function to data (creates tuples of paired data)
    def sales_tax(num):
        return num * 0.92

    revenue_minus_tax = price_items.map(sales_tax)
    
#### Applying lambda function to data
    discounted = revenue_minus_tax.map(lambda x : x*0.9)
    
#### chain methods in spark
    price_items.map(sales_tax).map(lambda x : x*0.9).top(15)
    
#### See the full lineage of all the operations that have been performed on an RDD
    discounted.toDebugString()
    
#### Flatmap (creates a list of data - all same level)
    flat_mapped = price_items.flatMap(lambda x : (x, x*0.92*0.9 ))

#### A filter method is a specialized form of a map function that only returns the items that match a certain criteria
    selected_items = discounted.filter(lambda x: x>300)
    
####  Use a reduce method with a lambda function to to add up all of the values in the RDD
    selected_items.reduce(lambda x,y :x + y)
    
#### reduceByKey to perform reducing operations while grouping by keys.
    total_spent = sales_data.reduceByKey(lambda x,y :x + y)
    
####  sortBy method on the RDD to rank the users from highest spending to least spending.
    total_spent.sortBy(lambda x: x[1],ascending = False).collect()


## Machine Learning in Spark

### ml and mllib machine learning libraries

[mllib](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html)
- mllib library is built upon the RDDs 
[ml](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html)
- ml library is built on Spark DataFrames


    
#### ml library

- used sklearn as an inspiration for their implementation of a machine learning library. As a result, many of the methods and functionalities look similar, but there are some crucial distinctions. There two main classes

`Transformer`
- An algorithm that transforms one pyspark DataFrame into another DataFrame.
- uses the .transform() method
- returns a new dataframe, usually with a new column appended
- examples of classes: Bucketizer, PCA

`Estimator`
- An algorithm that can be fit onto a pyspark DataFrame that can then be used as a Transformer.
- uses the .fit() method
- returns a model object
- examples of calsses: StringIndexerColum, RandomForestModel

#### ml library machine learning steps

##### ensure proper data types
- only takes numbers (decimals, whole numbers) called 'doubles','integers' in Spark
- Spark will make a guess on datatypes, if not recognized as a double or integer, you can use the .cast() method in combination of .withColumn() method to convert the datatype ("integer", "double")

        #create new column with numerical data
        original_df = original_df.withColumn("col_name", df.col_name.cast('datatype')
##### handle categorical variables
- there are two steps to take to one-hot-encode categorical variables:
- step 1: use StringIndexer to convert distinct variables to individual numbered columns

        #create stringindexer
        indexer = StringIndexer(inputCol='col_name', outputCol='new_cols_name')
- step 2: use OneHotEncoder to OHE the columns
        
        #create ohe
        encoder = OneHotEncoder(inputCol='col_name', outputCol='new_cols_name'
##### assemble a vector
- Spark modeling requires all columns containing features to be combined in one column
- done using VectorAssembler

        #assemble matrix into a vector
        assembler = VectorAssembler(inputCols=['list', 'of', 'col', 'names'], outputCol='feature')
##### create a pipeline
- combines all estimators and transformers
- wraps the process so you can reuse the named pipeline

        #import pipeline
        from pyspark.ml import Pipeline
        #make pipeline
        pipeline = Pipeline(stages=[indexer, encoder, assembler])
##### fit and transform data

        #fit and transform
        piped_data = pipeline.fit(model_data).transform(model_data)
##### split data into train and test sets

        #split data
        training, test = piped_data.randomSplit([.7,.3])
##### run selected model
- there are many models to choose from, ensure proper package is imported and instantiated
##### select evaluation method
- select evaluation via pyspark.ml.evaluation
        
        evaluator = selectedEvaluationMethod()
##### hyperparameter tuning
- performed using the pyspark.ml.tuning library
- if several parameters will be tuned, use ParamGridBuilder() method

        #import tuning package
        import pyspark.ml.tuning as tune
        #create grid
        grid = ParamGridBuilder()
        #replace the grid with all additions
        grid = grid.addGrid(model.parameter, [0,1,2])
        grid = grid.addGrid(model.parameter, [0,1,2])
        #build the grid
        grid = grid.build()
##### create the validator        
- the tuning module also contains the CrossValidator class which will implement your selected evaluator

        #create crossvalidator
        cv = tune.CrossValidator(estimator=model_name,
                                 estimatorParamMaps=grid,
                                 evaluator=evaluator)
##### fit training data
    best_model = model_name.fit(training)
##### predict test data
    results = best_model.transform(test)
##### evaluate model
    print(evaluator.evaluate(test_results))

#### Sample ml library Pipeline

    from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
    from pyspark.ml.evaluation import RegressionEvaluator

    #set pipeline parameters
    string_indexer = StringIndexer(inputCol='month',outputCol='month_num',handleInvalid='keep')
    one_hot_encoder = OneHotEncoderEstimator(inputCols=['month_num'],outputCols=['month_vec'])
    vector_assember = VectorAssembler(inputCols=features,outputCol='features')
    random_forest = RandomForestRegressor(featuresCol='features',labelCol='area')
    stages =  [string_indexer, one_hot_encoder, vector_assember,random_forest]

    pipeline = Pipeline(stages=stages) #instantiate pipeline

    params = ParamGridBuilder()\
    .addGrid(random_forest.maxDepth, [5,10,15])\
    .addGrid(random_forest.numTrees, [20,50,100])\
    .build() #performs gridsearch on set parameters

    reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area',metricName = 'mae') #evaluates model

    cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params,evaluator=reg_evaluator)

    cross_validated_model = cv.fit(spark_df) #fits model

    cross_validated_model.avgMetrics #returns best metrics based on metricName

    #shows selected predictions
    predictions = cross_validated_model.transform(spark_df)
    predictions.select('prediction','area').show(300)

    cross_validated_model.bestModel.stages #checking best model by stage

    optimal_rf_model = cross_validated_model.bestModel.stages[3] #looking at stage 3 of process
    optimal_rf_model.fe

    optimal_rf_model.featureImportances #checking feature importance

## Simple Spark Word Count Function

    stopWordList = ['', 'the','a','in','of','on','at','for','by','i','you','me'] 
    def wordCount(filename, stopWordlist):
        output = sc.textFile(filename)
        words1 = lines.flatMap(lambda x: x.split(' '))
        words2 = words1.map(lambda x: (x.lower(), 1))
        wordCount = words2.reduceByKey(lambda x,y: x+y)
        freqWords = wordCount.filter(lambda x:  x[1] >= 5 )
        stopWords = freqWords.filter(lambda x:  x[0] in stopWordList) 
        output = stopWords.collect()

        return output