### All the stuff is already in Github + the source files

# For truly large files > 100G you cant do the procesing on your laptop
* options: Hadoop and Spark (Pig) 
* the idea is the same as multiprocessing but on larger scale (clusters multiple CPU multiple drives, large RAM) 
* these things are obviously quite complex, but its important that you at least hear it and touch it
* links: https://www.ibm.com/cloud/blog/hadoop-vs-spark https://phoenixnap.com/kb/hadoop-vs-spark 

# Hadoop
Apache Hadoop is a platform that got its start as a Yahoo project in 2006, which became a top-level Apache open-source project afterward. This framework handles large datasets in a distributed fashion. The Hadoop ecosystem is highly fault-tolerant and does not depend upon hardware to achieve high availability. This framework is designed with a vision to look for the failures at the application layer. It’s a general-purpose form of distributed processing that has several components: 

* Hadoop Distributed File System (HDFS): This stores files in a Hadoop-native format and parallelizes them across a cluster. It manages the storage of large sets of data across a Hadoop Cluster. Hadoop can handle both structured and unstructured data. 

* HAdoop has its own file system and storage system

* MapReduce: It is the algorithm that actually processes the data in parallel to combine the pieces into the desired result. 

* Hadoop is built in Java, and accessible through many programming languages, for writing MapReduce code, including Python, through a Thrift client.  It’s available either open-source through the Apache distribution, or through vendors such as Cloudera (the largest Hadoop vendor by size and scope), MapR, or HortonWorks. 
* `MAP` - `REDUCE`

![](imgs/hadoop_fs.jpg)

![](imgs/Map_Reduce.png)

# Spark

Apache Spark is an open-source tool. It is a newer project, initially developed in 2012, at the AMPLab at UC Berkeley. It is focused on processing data in parallel across a cluster, but the biggest difference is that it works in memory. It is designed to use RAM for caching and processing the data. Spark performs different types of big data workloads like:

It does not have its own storage system like Hadoop has, so it requires a storage platform like HDFS.

### RDD
Spark uses Resilient Distributed Datasets (RDD) or Spark RDD which makes data processing faster. Also, this is the key feature of Spark that enables logical partitioning of data sets during computation. 

RDD stands for Resilient Distributed Dataset where each of the terms signifies its features.

    Resilient: means it is fault tolerant by using RDD lineage graph (DAG). Hence, it makes it possible to do recomputation in case of node failure.
    Distributed:  As datasets for Spark RDD resides in multiple nodes.
    Dataset: records of data that you will work with.


### Five main components of Apache Spark:

* Apache Spark Core: It is responsible for functions like scheduling, input and output operations, task dispatching, etc.
* Spark SQL: This is used to gather information about structured data and how the data is processed.
* Spark Streaming: This component enables the processing of live data streams. 
* Machine Learning Library: The goal of this component is scalability and to make machine learning more accessible.
* GraphX: This has a set of APIs that are used for facilitating graph analytics tasks.

### Features of Spark

* 1.  Fast Processing– Spark contains Resilient Distributed Dataset (RDD) which saves time in reading and writing operations, allowing it to run almost ten to one hundred times faster than Hadoop.

* 2.  Flexibility– Apache Spark supports multiple languages and allows the developers to write applications in Java, Scala, R, or Python.

* 3.  In-memory computing– Spark stores the data in the RAM of servers which allows quick access and in turn accelerates the speed of analytics.

* 4.  Real-time processing– Spark is able to process real-time streaming data. Unlike Map-Reduce which processes only stored data.

* 5.  Better analytics– Apache Spark consists of a rich set of SQL queries, machine learning algorithms, complex analytics, etc.

# Comparison

![](imgs/Hadoop_Spark_1.png)

# Spark is getting popular, for us is PySpark (you can even install it on your PC)
* when it makes sense to use spark
* if you have access to one
* difference between Spark and multiprocessing:

        * Spark has a much more powerfu and robust distributed computing framework than just multiprocessing.
        * Spark provides better automatic distribution, partition and rescaling of parallel tasks. Scaling and scheduling spark code becomes an easier task than having to program your custom multiprocessing code to respond to larger amounts of data + computations. 
        
* there is also "Dask" but only if you want to learn another library 

![](imgs/Pandas_Dask_PySpark.png)

Spark is a system for cluster computing. When compared to other cluster computing systems (such as Hadoop), it is faster. It has Python, Scala, and Java high-level APIs. In Spark, writing parallel jobs is simple. Spark is the most active Apache project at the moment, processing a large number of datasets. Spark is written in Scala and provides API in Python, Scala, Java, and R. In Spark, DataFrames are distributed data collections that are organized into rows and columns. Each column in a DataFrame is given a name and a type.

# Features of Spark
* Spark supports MAP-REDUCE
* PySpark DataFrame is immutable (cannot be changed once created), fault-tolerant and Transformations are Lazy evaluation (they are not executed until actions are called). 
* PySpark DataFrame’s are distributed in the cluster (meaning the data in PySpark DataFrame’s are stored in different machines in a cluster) and any operations in PySpark executes in parallel on all machines.
* PySpark can be executed on different cores and machines, unavailable in Pandas.
* PySpark supports SQL queries to run transformations. All you need to do is create a Table/View from the PySpark DataFrame.

## You can install pyspark on your computer but it can be tedious (let me know if it worked for you)

## Comparing Pandas and PySpark

In [134]:
import pandas as pd
import numpy as np

In [135]:
data = [{'fruit': 'apple', 'cost': 67.89, 'city': 'sao paulo'},
        {'fruit': 'mango', 'cost': 87.67, 'city': 'brasilia'},
        {'fruit': 'apple', 'cost': 64.76, 'city': 'araraquara'},
        {'fruit': 'banana','cost': 87.00, 'city': 'bertioga'},
        {'fruit': 'guava', 'cost': 69.56, 'city': 'brasilia'},
        {'fruit': 'mango', 'cost': 234.67, 'city': 'sao paulo'},
        {'fruit': 'apple', 'cost': 143.00, 'city': 'brasilia'},
        {'fruit': 'mango', 'cost': 49.0, 'city': 'bertioga'}]

In [136]:
df_Pandas=pd.DataFrame(data)

In [137]:
df_Pandas.head()

Unnamed: 0,fruit,cost,city
0,apple,67.89,sao paulo
1,mango,87.67,brasilia
2,apple,64.76,araraquara
3,banana,87.0,bertioga
4,guava,69.56,brasilia


In [138]:
df_Pandas['cost'].sum()

803.55

In [139]:
df_Pandas.groupby('city').count()

Unnamed: 0_level_0,fruit,cost
city,Unnamed: 1_level_1,Unnamed: 2_level_1
araraquara,1,1
bertioga,2,2
brasilia,3,3
sao paulo,2,2


In [114]:
# import the pyspark module
import pyspark

# import the sparksession from pyspark.sql module
from pyspark.sql import SparkSession

# instantiate - create sparksession and then give the app name
spark = SparkSession.builder.appName('statistics_globe').getOrCreate()

# creating a dataframe from the given list of dictionary
df_Spark = spark.createDataFrame(data)

# display the final dataframe
df_Spark.show()

+----------+------+------+
|      city|  cost| fruit|
+----------+------+------+
| sao paulo| 67.89| apple|
|  brasilia| 87.67| mango|
|araraquara| 64.76| apple|
|  bertioga|  87.0|banana|
|  brasilia| 69.56| guava|
| sao paulo|234.67| mango|
|  brasilia| 143.0| apple|
|  bertioga|  49.0| mango|
+----------+------+------+



In [115]:
from pyspark.sql.functions import sum, mean, col, max
df_Spark.select(sum('cost')).collect()

[Row(sum(cost)=803.55)]

In [116]:
df_Spark.select(mean('cost')).collect()

[Row(avg(cost)=100.44375)]

In [117]:
df_Spark.select(max('cost')).collect()

[Row(max(cost)=234.67)]

In [118]:
df_Spark.groupBy('city').count().show()

+----------+-----+
|      city|count|
+----------+-----+
|  bertioga|    2|
|  brasilia|    3|
| sao paulo|    2|
|araraquara|    1|
+----------+-----+



In [119]:
print("Partitions: " + str(df_Spark.rdd.getNumPartitions()) )
print("-"*80)

Partitions: 4
--------------------------------------------------------------------------------


In [120]:
df_Spark.printSchema()

root
 |-- city: string (nullable = true)
 |-- cost: double (nullable = true)
 |-- fruit: string (nullable = true)



* PySpark supports SQL queries to run transformations. All you need to do is create a `Table/View` from the PySpark DataFrame.

In [121]:
df_Spark.createOrReplaceTempView("Prices")

In [122]:
spark.sql("select * from Prices where cost > 70").show()
spark.sql("select mean(cost), max(cost) from India").show()

+---------+------+------+
|     city|  cost| fruit|
+---------+------+------+
| brasilia| 87.67| mango|
| bertioga|  87.0|banana|
|sao paulo|234.67| mango|
| brasilia| 143.0| apple|
+---------+------+------+

+----------+---------+
|mean(cost)|max(cost)|
+----------+---------+
| 100.44375|   234.67|
+----------+---------+



# PySpark and large files - wikipedia views per second

In [146]:
# A reference to our tab-separated-file
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('statistics_globe').getOrCreate()
csvFile = "pageviews_by_second.tsv"

tempDF = (spark.read           # The DataFrameReader
   .option('header', 'true')   # adding header
   .option("sep", "\t")        # Use tab delimiter (default is comma-separator)
   .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)

In [147]:
tempDF.show()

+-------------------+-------+--------+
|          timestamp|   site|requests|
+-------------------+-------+--------+
|2015-03-16T00:09:55| mobile|    1595|
|2015-03-16T00:10:39| mobile|    1544|
|2015-03-16T00:19:39|desktop|    2460|
|2015-03-16T00:38:11|desktop|    2237|
|2015-03-16T00:42:40| mobile|    1656|
|2015-03-16T00:52:24|desktop|    2452|
|2015-03-16T00:54:16| mobile|    1654|
|2015-03-16T01:18:11| mobile|    1720|
|2015-03-16T01:30:32|desktop|    2288|
|2015-03-16T01:32:24| mobile|    1609|
|2015-03-16T01:42:08|desktop|    2341|
|2015-03-16T01:45:53| mobile|    1704|
|2015-03-16T01:55:37|desktop|    2554|
|2015-03-16T01:57:29| mobile|    1825|
|2015-03-16T02:03:16|desktop|    2492|
|2015-03-16T02:10:32| mobile|    1667|
|2015-03-16T02:16:45|desktop|    2452|
|2015-03-16T02:19:32|desktop|    2412|
|2015-03-16T02:20:16|desktop|    2350|
|2015-03-16T02:22:08| mobile|    1802|
+-------------------+-------+--------+
only showing top 20 rows



In [148]:
s_time = time.time()
tempDF.groupBy('site').count().show()
e_time = time.time()
print("Read without chunks: ", (e_time-s_time), "seconds")

+-------+-------+
|   site|  count|
+-------+-------+
|desktop|3600000|
| mobile|3600000|
+-------+-------+

Read without chunks:  5.104434251785278 seconds


# PySpark - Huge files (just for taste)

In [149]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('statistics_globe').getOrCreate()
csvFile = "huge.csv"

tempDF = (spark.read           # The DataFrameReader
   .option('header', 'true')
   .option("sep", ",")         # Use tab delimiter (default is comma-separator)
   .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)

AnalysisException: Path does not exist: file:/home/michal/MEGASyncWork/Universidad_de_Antioquia/CARGAS/2022-1/Mineria_de_datos/Learning-Data-Mining-with-Python/EXTRAS/classes/class_17/huge.csv

In [26]:
tempDF.show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    null| 543.10|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|205301355563188265

In [27]:
from pyspark.sql.functions import sum
tempDF.select(sum('price')).collect()

[Row(sum(price)=12323880556.028587)]

# Spark/PySpark on Microsoft Azure (DataBricks)

### Databricks

Deploy an Azure Databricks workspace
1. Open the Azure portal
2. Click Create a Resource in the top left
3. Search for "Databricks"
4. Select Azure Databricks
5. On the Azure Databricks page select Create
    
6. Provide the required values to create your Azure Databricks workspace:  
    Subscription: Choose the Azure subscription in which to deploy the workspace.  
    esource Group: Use Create new and provide a name for the new resource group.  
    Location: Select a location near you for deployment. 
    For the list of regions that are supported by Azure Databricks, see Azure services available by region. (US EAST 3)
    Workspace Name: Provide a unique name for your workspace.  
    Pricing Tier: Trial (Premium - 14 days Free DBUs).   
    You must select this option when creating your workspace or you will be charged.   
     The workspace will suspend automatically after 14 days. When the trial is over you can convert the workspace to Premium but then you will be charged for your usage.
7. Select Review + Create.
8. Select Create.

The workspace creation takes a few minutes. During workspace creation, the Submitting deployment for Azure
Databricks tile appears on the right side of the portal. You might need to scroll right on your dashboard to view the
tile. There's also a progress bar displayed near the top of the screen. You can watch either area for progress

### Cluster
* for this exercise 

What is a cluster?
The notebooks are backed by clusters, or networked computers, that work together to process your data. The first step is to create a cluster. 

Create a cluster
1. When your Azure Databricks workspace creation is complete, select the link to go to the resource.
2. Select Launch Workspace to open your Databricks workspace in a new tab.
3. In the left-hand menu of your Databricks workspace, select Clusters.
4. Select Create Cluster to add a new cluster.
5. Enter a name for your cluster. Use your name or initials to easily differentiate your cluster from those of your co-workers.
6. Select the Cluster Mode: Single Node.
7. Select the Databricks RuntimeVersion: Runtime: 7.3 LTS (Scala 2.12, Spark 3.0.1).
8. Under Autopilot Options, leave the box checked and in the text box enter 45.
9. Select the Node Type: F (with this free options you cannot afford anything else)
10. Select Create Cluster.

## Table
* uplodad de data to Azure Databricks using "create -> table"
* the data ('pageviews_by_second.tsv') is here:
https://mega.nz/folder/LAcGHJ4I#_uJ79tPCc4i5uWa0ps2itQ

### Notebook
* create new notebook and connect it to the cluster (top right corner)
* start learning PySpark, try and test simple or more complex codes 

### Update you CV
* basic knowledge on Spark (Pyspark) and Microsoft Azure Databricks