# [Introduction to Apache Spark](http://spark.apache.org/) 🎇🎇 ✨✨

<center><img src="https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Apache_Spark_logo-f31fc9de-9456-4351-b459-2fe24f92628b.png"></center>

* Apache Spark in an open-source distributed cluster-computing system designed to be fast and general-purpose.
* Created in 2009 at Berkeley's AMPLab by Matei Zaharia, the Spark codebase was donated in 2013 to the Apache Software Foundation. It has since become one of its most active projects.
* Spark provides high-level APIs in `Scala`, `Java`, `Python` and `R` and an optimized execution engine. On top of this 
technology, sit higher-lever tools including `Spark SQL`, `MLlib`, `GraphX` and `Spark Streaming`.

## What you will learn in this course 🧐🧐
This course will teach you some theory about the Spark framework, how it works and what advantages it has over other distributed computing frameworks. Here's the outline:

* Apache Spark
* Hadoop vs Spark
    * Faster through In-Memory computation
    * Simpler (high-level APIs) and execution engine optimisation
* Need for a DFS (Distributed File System)
* The Spark stack
    * Spark Core - the main functionnalities of the framework
    * Spark SQL - to handle structured data and run queries
    * GraphX - The visualisation tool of Spark
    * MLlib - The machine learning toolbox for Spark
    * Spark Streaming - An API to handle continuous inflow of data

* Spark's mechanics
    * DAG (Directed Acyclic Graphs) scheduling
    * Lazy execution
        * Transformations
        * Actions
    * Mixed language
* PySpark

## Ressources 📚📚
* The [official spark documentation](http://spark.apache.org/docs/latest/)
* [cluster-overview](https://spark.apache.org/docs/latest/cluster-overview.html)
* Interesting notes on clusters: [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-cluster.html](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-cluster.html)
* You can take a look at [Spark Basics : RDDs,Stages,Tasks and DAG](https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454) but this covers concepts we haven't seen yet
* [Debugging PySpark](https://www.youtube.com/watch?v=McgG09XriEI)
- [What is Spark SQL](https://databricks.com/glossary/what-is-spark-sql)
- [Deep dive into Spark SQL's Catalyst Optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)
- [SparkSqlAstBuilder](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkSqlAstBuilder.html)
- [A Gentle Introduction to Stream Processing](https://medium.com/stream-processing/what-is-stream-processing-1eadfca11b97)


## Hadoop vs Spark 🐘🆚✨

- **Faster through In-Memory computation** ⚡ Because memory time access is much faster than disk access, Spark's In-Memory computation makes it much faster than Hadoop which relies on disk
- **Simpler (high-level APIs) and execution engine optimisation 🧸 :** 
    * Spark's high-level APIs combined with lazy computation *means we don't have to optimize each query. Spark execution engine will take care of building an optimized physical execution plan.*
    * Also, code you write in "local" mode will work on a cluster "out-of-the-box" thanks to Spark's higher level API.
    * That doesn't mean it will be easy to write Spark code, but Spark makes it much easier to write optimized code that will run at big data scale.

## The need for a distributed storage 🔀🔀

* If compute is distributed, all the machine needs to have access to the data, without a distributed storage that would be **very tedious**.
* Unlike Hadoop, Spark doesn't come with its own file system, but can interface with many existing ones, such as Hadoop Distributed File System (HDFS), Cassandra, Amazon S3 and many more...
* Spark can supports a pseudo-distributed local mode (for development or testing purposes), in this case, Spark is run on a single machine with one executor per CPU core and a distributed file storage is not required.

## The Spark Stack ✨⚙️

One of Spark's promises is to deliver a unified analytics system. On top of its powerful distributed processing engine (Spark Core), sits a collection of higher-level libraries that all benefit from the improvements of the core library, which are low latency, and lazy execution.

*That's true in general, but can suffer from some caveats, in particular Spark Streaming's performances can't rival those of Storm and Flink which are other framework for running streaming jobs.*

<center><img src="https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/spark-stack-oreilly-674376df-ecdf-45f2-8ef7-539393568c0e.png"></center>

Source: Learning Spark (O'Reilly - Holden Karau, Andy Konwinski, Patrick Wendell & Matei Zaharia)

### Spark Core 💖

Spark Core is the underlying general execution engine for the Spark platform that all other functionalities are built on top of.

It provides many core functionalities such as task dispatching and scheduling, memory management and basic I/O (input/output) functionalities, exposed through an application programming interface.

### Spark SQL 🔢

Spark module for structured data processing.

Spark SQL provides a programming abstraction called DataFrame and can also act as a distributed SQL query engine. DataFrames are the other main data format in Spark. Spark DataFrames are column oriented, they have a data schema which describes the name and type of all the available columns. It allows for easier processing but adds contraints on the cleanliness and structure of the data.

Also they're called "DataFrames"

At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (such as Scala’s pattern matching and quasi quotes) to build an extensible query optimizer.

<center><img src="https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Catalyst-Optimizer-diagram-152974c4-e1fc-4bb5-a788-c1ee71657ecd.png"></center>


Source: [https://databricks.com/glossary/catalyst-optimizer](https://databricks.com/glossary/catalyst-optimizer)

### GraphX 📊

Spark module for Graph computations.

GraphX is a graph computation engine built on top of Spark that enables users to interactively build, transform and reason about graph structured data at scale. It comes with a library of common visualizations.

### MLlib 🔮

Machine Learning library for Spark, inspired by Scikit-Learn (in particular, its pipelines system).

Historically a RDD-based API, it now comes with a DataFrame-based API that has become the primary API while the RDD-based API is now in [maintenance mode](https://spark.apache.org/docs/latest/ml-guide.html#announcement-dataframe-based-api-is-primary-api).

### Spark streaming 🌊

Spark module for stream processing.

Streaming, also called Stream Processing is used to query continuous data stream and process this data within a small time period from the time of receiving the data. This is the opposite of batch processing, which occurs at a previously scheduled time independently from the data influx.

Spark Streaming uses Spark Core's fast scheduling capability to perform streaming analytics. It ingests data in mini-batches and performs RDD transformations on those mini-batches of data. This design enables the same set of application code written for batch analytics to be used in streaming analytics, this comes at the cost of having to wait for the full mini-batch to be processed while alternatives like Apache Storm and Apache Flink process data by event and provide better speed.

## Spark mechanics & [cluster-overview](https://spark.apache.org/docs/latest/cluster-overview.html) ⚙️⚙️

> At a high level, every Spark application consists of a driver program that launches various parallel operations on a cluster. The driver program contains your application's main function and defines distributed datasets on the cluster, then applies operations on them.
- Learning Spark, page 14

<center><img src="https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/cluster-overview-273ddf73-9063-47bb-9060-e094443700eb.png" /></center>


## [DAG](https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454) (Directed Acyclic Graph) Scheduling 📅

* In order to distribute the execution among the worker nodes, Spark transforms the logical execution plan into a physical execution plan (how the computation will actually take place). While doing so, it implements an execution plan that will maximize performances, in particular avoiding moving data across the network, because as we've seen, network latency is the worse.

## Lazy Execution 😴

* A consequence of Spark's being so efficient when computing operations is lazy execution. This concept sets Spark (and therefore PySpark, the python API for using the Spark framework in python language) from classic python.

* meaning that an operation is not performed until an output is explicitly needed. For example, a join operation between two Spark dataframes will not immediately cause the join operation to be performed, which is how Pandas works. Instead, the join is performed once an output is added to the chain of operations to perform, such as displaying a sample of the resulting dataframe. One of the key differences between Pandas operations, where operations are eagerly performed and pulled into memory, is that PySpark operations are lazily performed and not pulled into memory until needed. One of the benefits of this approach is that the graph of operations to perform can be optimized before being sent to the cluster to execute.

* Python runs in what we call **eager execution**, meaning everytime you write some code and execute it, the operations your code is asking the computer to execute happen immediatly and the result is returned.

* In Spark things work a little differently, there are two types of operations: **transformations** and **actions**.

## Transformations 🧙

* **Transformations** are all operations that do not explicitely require the computer to return a result that should be stored, displayed or saved somewhere. These operations are only writen to the Graph waiting for an action to come up. For example, if you wish to calculate the frequency of all the the words in a set of text data, you may want to

1. isolate each word,
2. assign them a value of 1,
3. group elements by key (meaning the word itself) so all occurences of the same words are grouped together,
4. aggregate by summing the values associated with the words for each group.

* None of these operations require direct display or storage of a result, they just constitute in a roadmap plan that can be optimized whenever you request to see the result!

## Actions 🦸

* **Actions** are operations that explicitely ask the computer to display or store the result of an operation. Taking our previous example, if we ask to see the complete list of words with their frequency, then all the previously mentionned transformation will actually execute one after the other. It can be very computing efficient because Spark knows all the operations that need to be done and can therefore plan accordingly, but additionnaly, if you're not looking to see the full result but just an extract to make sure the code runs correctly for example, then Spark will only work enough to give you want you want and stop (think of it as testing a piece of code on a sample instead of the full dataset for speed reasons).

* Lazy execution makes Spark very computing efficient, but it also makes it harder to debug when something goes wrong. Because only some errors can be detected when running transformations because Spark does not actually try to run the code. You can later be met with a runtime error when using an action later (when the code actually starts running), and if the result you get is not the one you expected, you'll need to go back and inspect every transformation to find out where something went wrong.

* Seems intimidating I know, but you can always set up actions like displaying the first few lines of data after each transformation in order to run sanity checks on what you are doing. It's a fair price to pay to be able to work with huge amounts of data.

## Mixed language ☯️☯️

* Apache Spark is written in `Scala`, making wide usage of the `Java Virtual Machine` and can be interfaced with: `Scala` (primary), `Java`, `Python` (`PySpark`) and `R`.

* Because Spark is written in `Scala`, `PySpark`, the `Python` interface tends to follow Scala's principle, whether for small details like naming convention (PySpark's API is frequently not consistent with Python's standard good practices, for example using pascalCase instead of snake_case) or global programming paradigm like functional programming.

* The functional paradigm is particularly adapted for distributed computing as it uses concept like immutability.

## PySpark 🐍✨

* PySpark is the Python API for Apache Spark. Powerful, but some caveats:

    - *Not as exhaustive as other's python libraries for data analysis and modeling (pandas, sklearn, etc..)*
    - *Will be slower than these on small data*
    - *Mixed language (harder to debug, common to find resources for Scala and not Python)*

* Debugging PySpark is hard:

    - *Debugging Distributed systems is hard*
    - *Debugging mixed languages is hard*
    - *Lazy evaluation can be difficult to debug*

> 💡 If you want an API closer to pandas while maintaining fast big data processing capabilities, take a look at [koalas](https://github.com/databricks/koalas) (still in beta) and [handyspark](https://towardsdatascience.com/handyspark-bringing-pandas-like-capabilities-to-spark-dataframes-5f1bcea9039e) (more robust).
---

# Install Spark (easy way : on Colab)

In [None]:
# !ls
# As you see, we don't have yet spark installed ! 
!apt-get update

In [8]:
# https://spark.apache.org/docs/latest/
# Spark runs on Java 8/11, Scala 2.12, Python 3.6+ and R 3.5+. Java 8 prior to version 8u92 support is deprecated as of Spark 3.0.0.!
# https://openjdk.java.net/
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Q : What's the difference : openjdk-normal Vs. openjdk-headless ? 
# R : openjdk-normal permet de créer des prog avec GUI. Ns n'avons pas besoin, ns allons lancer des calculs 
# https://stackoverflow.com/questions/24280872/difference-between-openjdk-6-jre-openjdk-6-jre-headless-openjdk-6-jre-lib

In [9]:
# Le dépôt des archives : 
# https://archive.apache.org/dist/spark/

!wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

--2021-12-21 23:03:57--  https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 224374704 (214M) [application/x-gzip]
Saving to: ‘spark-3.1.1-bin-hadoop2.7.tgz’


2021-12-21 23:04:07 (23.8 MB/s) - ‘spark-3.1.1-bin-hadoop2.7.tgz’ saved [224374704/224374704]



In [10]:
# On décompresse
!tar xf /content/spark-3.1.1-bin-hadoop2.7.tgz

In [None]:
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
# On décompresse
!tar xf content/spark-2.3.1-bin-hadoop2.7.tgz

In [11]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 42 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 43.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=8596828c3441a27474102dc28f6e7b97f6073f86f602d26ad89ee02f0c7a6651
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


* `findspark` pkg a pr obj : Find Spark Home, and initialize by adding `pyspark` to `sys.path`
    ```
    import findspark
    findspark.init()
    ```
    <https://stackoverflow.com/questions/36799643/pyspark-sparkcontext-name-error-sc-in-jupyter>

In [12]:
!pip install findspark

Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2


## Setup environment

In [13]:
# We verify that spark is installed ! 
!ls

drive  sample_data  spark-3.1.1-bin-hadoop2.7  spark-3.1.1-bin-hadoop2.7.tgz


In [14]:
# We add env var
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7" 

In [2]:
!pip install pyspark
# import findspark
# findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate() 
spark

Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s  eta 0:00:01    |███▍                            | 29.8 MB 2.3 MB/s eta 0:01:48     |████████████                    | 106.1 MB 681 kB/s eta 0:04:17     |█████████████                   | 113.7 MB 2.7 MB/s eta 0:01:03     |█████████████                   | 114.3 MB 2.7 MB/s eta 0:01:03     |███████████████▍                | 135.5 MB 2.4 MB/s eta 0:01:02     |██████████████████              | 157.7 MB 530 kB/s eta 0:03:54     |██████████████████████          | 193.6 MB 2.1 MB/s eta 0:00:42     |████████████████████████▉       | 217.9 MB 2.7 MB/s eta 0:00:24     |████████████████████████████    | 245.7 MB 2.9 MB/s eta 0:00:13     |█████████████████████████████▉  | 262.3 MB 2.7 MB/s eta 0:00:07     |██████████████████████████████▋ | 268.8 MB 2.7 MB/s eta 0:00:05
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |██████

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/10 00:41:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/07/10 00:41:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
from google.colab import drive
drive.mount('/content/drive')
from google.colab import files
files.upload()

Mounted at /content/drive


In [None]:
import os
os.getcwd()

In [None]:
os.listdir('/content/drive/MyDrive/data')

[]

# [Databricks Community](https://community.cloud.databricks.com/) 🧱🧱

## What you will learn in this course 🧐🧐
* This course is a demo that will introduce to you one of the main data formats in spark which is spark RDDs (Resilient Distributed Datasets), we will walk you through how to use this low level data format using pyspark.
Here's the outline:

* Databricks
    * Login Page
    * Homepage
    * Workspace
    * Create Folder
    * Upload Notebook
    * Notebook View

* Databricks is a cloud service provider which makes available clusters of machines with the spark framework already installed on them. Spark can be a real pain to set up, but gets amazing results once it's all up and running. We'll use databricks here so we can all work on a standardized environment!

* We'll use the community edition which is free, but limits the number and performance of the machines in our cluster. However this is not going to change a thing in terms of the code we'll right, whatever we'll learn here can scale up by connecting to a bigger cluster.

* Here's a walkthrough of what you should do once you are logged in ;)

### Login Page 🔑
![](https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Databricks/databricks_login.PNG)

### Homepage 🏠
![](https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Databricks/databricks_homepage.PNG)

### Workspace 👷
![](https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Databricks/databricks_workspace.PNG)

### Create Folder 📁
![](https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Databricks/databricks_create_folder.PNG)

### Upload Notebook 📤
![](https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Databricks/databricks_import_notebook.PNG)

### Notebook View 📝
![](https://full-stack-assets.s3.eu-west-3.amazonaws.com/images/Databricks/databricks_notebook_view.PNG)

# Install Spark (en local)

In [1]:
import pandas as pd
import requests
import json
# import findspark
# findspark.init(spark_home='/home/sayf/hadoop/spark')
# findspark.init(spark_home=r'C:/Users/bejao/AppData/Local/spark/spark-2.1.0-bin-hadoop2.7')

# Chargement de Spark : [SparkSession](https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/) & [SparkContext](https://sparkbyexamples.com/spark/how-to-create-a-sparksession-and-spark-context/)

In [1]:
from pyspark.sql import SparkSession
# spark = SparkSession.builder.getOrCreate()
# local[*] => On utilise les coeurs de la marchine
spark = SparkSession.builder.master("local[*]") \
                    .appName('spark') \
                    .getOrCreate()
# spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
sc = spark.sparkContext.setLogLevel('OFF')
# from pyspark.sql.types import IntegerType, StringType, DoubleType, BooleanType, TimestampType, StructField, StructType
# import pyspark.sql.functions as F  

path = 'file:///databricks/driver/Crimes-2001_to_present.txt'
path = 'file:///mnt/c/Users/bejao/OneDrive/data/Crimes-2001_to_present.txt'
spark

22/09/01 14:50:18 WARN Utils: Your hostname, DESKTOP-G4OOFUM resolves to a loopback address: 127.0.1.1; using 172.29.192.131 instead (on interface eth0)
22/09/01 14:50:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/01 14:50:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
SparkSession.getActiveSession()
# spark.stop()

# PySpark - Dataframes 🗄️🗄️

## What will you learn in this course? 🧐🧐

* Running SQL queries against DataFrames
    * Select columns in Spark DataFrames
    * Actions
        * `.show()`
        * `.printSchema()`
        * `.take()`
        * `.collect()`
        * `.count()`
        * `.describe()`
        * `display()`
        * `.toPandas()`
        * `..write()`
    * Transformations
        * `.na`
        * `.fill()`
        * `.drop()`
        * `.isNull()`
        * `.replace()`
        * `.sql()`
        * `.select()`
        * `.alias(...)`
        * `.drop(...)`
        * `.limit()`
        * `.filter()`
        * `.selectExpr()`
        * `.dropDuplicates()`
        * `.distinct()`
        * `.orderBy()`
        * `.groupBy()`
        * `.withColumn()`
        * `.withColumnRenamed()`
        * Chaining everything together
        
* Some differences with pandas' DataFrames

# Download and [preprocessing](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#Viewing-Data) [Chicago's Reported Crime Data](https://data.cityofchicago.org/)

In [17]:
# Download the file
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
# Rename the file 
!mv rows.csv?accessType=DOWNLOAD reported-crimes.csv
!ls

--2021-12-21 23:07:41--  https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.68.26, 52.206.140.205, 52.206.140.199
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.68.26|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [      <=>           ]   1.64G  3.39MB/s    in 8m 53s  

2021-12-21 23:16:35 (3.15 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [1760142265]

drive		     sample_data		spark-3.1.1-bin-hadoop2.7.tgz
reported-crimes.csv  spark-3.1.1-bin-hadoop2.7


# Read a huge csv : [doc offic](https://spark.apache.org/docs/latest/sql-data-sources-csv.html)

## Lecture avec Python : 

In [3]:
import pandas as pd
pd.read_csv(path)
# Le lecture est très coûteuse en temps et en mémoire. Il faut passer par un outil dédié à la Big Data => Spark

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
0,11727746,JC312349,06/18/2019 11:55:00 PM,0000X W 79TH ST,0484,BATTERY,PRO EMP HANDS NO/MIN INJURY,CTA STATION,False,False,...,,,,,,,,,,
1,11728171,JC312895,06/18/2019 11:55:00 PM,044XX S CHRISTIANA AVE,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,...,,,,,,,,,,
2,11728165,JC312901,06/18/2019 11:55:00 PM,044XX S CHRISTIANA AVE,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,...,,,,,,,,,,
3,11727579,JC312360,06/18/2019 11:50:00 PM,076XX S COTTAGE GROVE AVE,0470,PUBLIC PEACE VIOLATION,RECKLESS CONDUCT,STREET,True,False,...,,,,,,,,,,
4,11727572,JC312333,06/18/2019 11:50:00 PM,040XX W SCHOOL ST,0460,BATTERY,SIMPLE,BAR OR TAVERN,False,False,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6899945,1403263,G117094,01/01/2001 12:00:00 AM,084XX S PRAIRIE AV,1120,DECEPTIVE PRACTICE,FORGERY,BANK,False,False,...,-87.617741,"(41.741163326, -87.617740827)",31.0,21546.0,40.0,406.0,32.0,61.0,20.0,241.0
6899946,1352564,G055473,01/01/2001 12:00:00 AM,018XX N TALMAN AV,0820,THEFT,$500 AND UNDER,STREET,False,False,...,-87.693605,"(41.914955409, -87.693605188)",24.0,22535.0,23.0,180.0,41.0,1.0,7.0,171.0
6899947,1313893,G000035,01/01/2001 12:00:00 AM,027XX W NELSON ST,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,False,False,...,-87.697098,"(41.936549915, -87.697097823)",24.0,21538.0,22.0,467.0,20.0,39.0,7.0,174.0
6899948,1374123,G083329,01/01/2001 12:00:00 AM,045XX S LAPORTE AV,0820,THEFT,$500 AND UNDER,STREET,False,False,...,-87.747061,"(41.810679064, -87.747061323)",35.0,22268.0,53.0,604.0,28.0,7.0,13.0,112.0


## Lecture avec Spark

[read.csv](https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv), [read.json](https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json), [read.parquet](https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.parquet), option [InferSchema](https://www.learntospark.com/2020/10/spark-optimization-technique-inferschema.html)

In [21]:
# Lecture rapide (sans spécifier les options nécessaires)
# df = spark.read.csv(path)

# Lecture avec l'option 'header=True' (pour rajouter les noms de colonnes à votre df)
# df = spark.read.csv(path, header=True)
# df.rdd.getNumPartitions()

# Lecture avec l'option 'inferSchema' (plus coûteuse : 30s). Elle permet de transformer les colonnes en types plus précis : int  / boolean / string / double...
# bien sûr spark trouve les types uniquement si le fichier d'origine permet de les trouver de manière simple

df = spark.read.csv(path, header=True, inferSchema=True)

df = spark.read.format("csv").option("header","true")\
                             .option("delimiter", ",")\
                             .load(path)

# Lire à partir de l'API ? 
url = 'https://data.cityofchicago.org/resource/ijzp-q8t2.json'
resp = requests.get(url=url).json()
df = spark.createDataFrame(resp)

# Boucker sur ttes les années et concatener les dfs
import requests
import json
year=2001
data = {}
while year <= 2022:
    print(year)
    r = requests.get(f'https://data.cityofchicago.org/resource/ijzp-q8t2.json?$limit=9223372036854775807&$where=year={year}').json()
    data[year]= spark.createDataFrame(r)
    year +=1
data

df_concat = data.get(2001).union(data.get(2002)).union(data.get(2003))
df_concat

from functools import reduce
from pyspark.sql import DataFrame
year = range(2001,2023)
dfs = []
for y in year:
    dfs.append(data.get(y))
dfs
# dfs = [data.get(2001), data.get(2002)]
df_ = reduce(DataFrame.union, dfs)


## Lire un fichier Excel  
df = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/FileStore/tables/new_ville.xlsx") \
df.show()

# df #  Affichage intelligent, uniquement les cols et leur type, contexte Big Data (pas le df en entier )
# df.show(5)
df.show(1, vertical=True)
# https://sqlrelease.com/show-full-column-content-in-spark
# df.show(1, vertical=True, truncate=False)


df.dtypes
# type(df.dtypes)
pd.DataFrame(df.dtypes, columns=['col', 'type']).value_counts('type')

type
int        16
string     10
boolean     2
double      2
dtype: int64

# [Affichage comme dans un notebook natif](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#Viewing-Data)

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

22/01/07 20:29:16 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
11727746,JC312349,06/18/2019 11:55:...,0000X W 79TH ST,0484,BATTERY,PRO EMP HANDS NO/...,CTA STATION,False,False,623,6,6,44,08B,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11728171,JC312895,06/18/2019 11:55:...,044XX S CHRISTIAN...,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,821,8,14,58,14,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11728165,JC312901,06/18/2019 11:55:...,044XX S CHRISTIAN...,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,821,8,14,58,14,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11727579,JC312360,06/18/2019 11:50:...,076XX S COTTAGE G...,0470,PUBLIC PEACE VIOL...,RECKLESS CONDUCT,STREET,True,False,624,6,6,69,24,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11727572,JC312333,06/18/2019 11:50:...,040XX W SCHOOL ST,0460,BATTERY,SIMPLE,BAR OR TAVERN,False,False,1731,17,30,16,08B,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11727655,JC312367,06/18/2019 11:45:...,116XX S PEORIA ST,031A,ROBBERY,ARMED: HANDGUN,STREET,False,False,524,5,34,53,03,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11727595,JC312338,06/18/2019 11:45:...,067XX N SHERIDAN RD,0910,MOTOR VEHICLE THEFT,AUTOMOBILE,STREET,False,False,2432,24,49,1,07,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11727611,JC312327,06/18/2019 11:39:...,002XX E 121ST PL,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,532,5,9,53,14,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11727571,JC312317,06/18/2019 11:32:...,059XX S PAULINA ST,051A,ASSAULT,AGGRAVATED: HANDGUN,RESIDENCE PORCH/H...,False,False,714,7,15,67,04A,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
11727883,JC312602,06/18/2019 11:30:...,025XX S TRUMBULL AVE,0810,THEFT,OVER $500,STREET,False,False,1024,10,22,30,06,,,2019,06/25/2019 04:19:...,,,,,,,,,,,


# Q : Déterminer le bon [schéma](https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/)

* Laisser Spark déterminer (pour nous) les bons types des cols est très coûteux parcq le df d'origine contient + de 5M rows => `spark.read.csv(path, header=True, InferSchema=True)` 
* Ns allons le faire manuellement (suivant notre intuition)

In [8]:
# Par défaut, Spark considère toutes les cols en String
# df.dtypes
# pd.DataFrame(df.dtypes, columns=['col', 'type']).T
pd.DataFrame(df.dtypes, columns=['col', 'type']).value_counts('type')
# df.printSchema()


type
string    30
dtype: int64

In [3]:
from pyspark.sql.types import IntegerType, StringType, DoubleType, BooleanType, TimestampType, StructField, StructType 

In [4]:
# cols = df.columns
cols  = ['ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude', 'Location', 'Historical Wards 2003-2015', 'Zip Codes', 'Community Areas', 'Census Tracts', 'Wards', 'Boundaries - ZIP Codes', 'Police Districts', 'Police Beats']

types = [IntegerType(), StringType(), StringType(), StringType(), StringType(), StringType(), StringType(), StringType(), BooleanType(), BooleanType(), IntegerType(), IntegerType(), IntegerType(), IntegerType(), StringType(), IntegerType(), IntegerType(), IntegerType(), StringType(), DoubleType(), DoubleType(), StringType(), IntegerType(), IntegerType(), IntegerType(), IntegerType(), IntegerType(), IntegerType(), IntegerType(), IntegerType()]

# les cols `Dates` sont conservés en StringType et seront convertit en TimestampType parce qu'elle nécessitent un traitement spécifique.

# On crée une liste de tuples avec `zip`
cols_types = list(zip(cols, types))

# On ajoute `True` dans les tuples à l'aide de comprehension list
schema = StructType([ StructField(i[0], i[1], True) for i in cols_types])
schema

StructType([StructField('ID', IntegerType(), True), StructField('Case Number', StringType(), True), StructField('Date', StringType(), True), StructField('Block', StringType(), True), StructField('IUCR', StringType(), True), StructField('Primary Type', StringType(), True), StructField('Description', StringType(), True), StructField('Location Description', StringType(), True), StructField('Arrest', BooleanType(), True), StructField('Domestic', BooleanType(), True), StructField('Beat', IntegerType(), True), StructField('District', IntegerType(), True), StructField('Ward', IntegerType(), True), StructField('Community Area', IntegerType(), True), StructField('FBI Code', StringType(), True), StructField('X Coordinate', IntegerType(), True), StructField('Y Coordinate', IntegerType(), True), StructField('Year', IntegerType(), True), StructField('Updated On', StringType(), True), StructField('Latitude', DoubleType(), True), StructField('Longitude', DoubleType(), True), StructField('Location', S

In [5]:
cols_types

[('ID', IntegerType()),
 ('Case Number', StringType()),
 ('Date', StringType()),
 ('Block', StringType()),
 ('IUCR', StringType()),
 ('Primary Type', StringType()),
 ('Description', StringType()),
 ('Location Description', StringType()),
 ('Arrest', BooleanType()),
 ('Domestic', BooleanType()),
 ('Beat', IntegerType()),
 ('District', IntegerType()),
 ('Ward', IntegerType()),
 ('Community Area', IntegerType()),
 ('FBI Code', StringType()),
 ('X Coordinate', IntegerType()),
 ('Y Coordinate', IntegerType()),
 ('Year', IntegerType()),
 ('Updated On', StringType()),
 ('Latitude', DoubleType()),
 ('Longitude', DoubleType()),
 ('Location', StringType()),
 ('Historical Wards 2003-2015', IntegerType()),
 ('Zip Codes', IntegerType()),
 ('Community Areas', IntegerType()),
 ('Census Tracts', IntegerType()),
 ('Wards', IntegerType()),
 ('Boundaries - ZIP Codes', IntegerType()),
 ('Police Districts', IntegerType()),
 ('Police Beats', IntegerType())]

In [6]:
path = 'file:///mnt/c/Users/bejao/OneDrive/data/Crimes-2001_to_present.txt'
df = spark.read.csv(path, header=True, schema=schema)
df
# pd.DataFrame(df.dtypes, columns=['col', 'type']).value_counts('type')

DataFrame[ID: int, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: boolean, Domestic: boolean, Beat: int, District: int, Ward: int, Community Area: int, FBI Code: string, X Coordinate: int, Y Coordinate: int, Year: int, Updated On: string, Latitude: double, Longitude: double, Location: string, Historical Wards 2003-2015: int, Zip Codes: int, Community Areas: int, Census Tracts: int, Wards: int, Boundaries - ZIP Codes: int, Police Districts: int, Police Beats: int]

In [None]:
StructType([StructField(ID,StringType,true),
    StructField(Case Number,StringType,true),StructField(Date,TimestampType,true),StructField(Block,StringType,true),StructField(IUCR,StringType,true),StructField(Primary Type,StringType,true),StructField(Description,StringType,true),StructField(Location Description,StringType,true),StructField(Arrest,StringType,true),StructField(Domestic,BooleanType,true),StructField(Beat,StringType,true),StructField(District,StringType,true),StructField(Ward,StringType,true),StructField(Community Area,StringType,true),StructField(FBI Code,StringType,true),StructField(X Coordinate,StringType,true),StructField(Y Coordinate,StringType,true),StructField(Year,IntegerType,true),StructField(Updated On,StringType,true),StructField(Latitude,DoubleType,true),StructField(Longitude,DoubleType,true),StructField(Location,StringType,true),StructField(Historical Wards 2003-2015,StringType,true),StructField(Zip Codes,StringType,true),StructField(Community Areas,StringType,true),StructField(Census Tracts,StringType,true),StructField(Wards,StringType,true),StructField(Boundaries - ZIP Codes,StringType,true),StructField(Police Districts,StringType,true),StructField(Police Beats,StringType,true))])


type
string     20
int         6
boolean     1
float       1
dtype: int64

# head, tail, take, limit, collect, show ? 

In [11]:
# limit() retourne un df-spark que ns pouvons afficher avec show, que je peux convertir en df-pandas => affichage plus agréable 
# https://sparkbyexamples.com/pyspark/convert-pyspark-dataframe-to-pandas/
# df.limit(5).toPandas()

# pd.DataFrame(df.head(5), columns=df.columns)
# df.head(5) #  contrairement à show(), head() affiche une liste de Row
# df.take(5) #  idem
# type(df.head(5)) #  liste de row (à la spark)
# type(df.head(5)[0])

# df.limit(5).collect() #  collect après limit retourne une liste de Row
df.collect() # ne pas utiliser parcq retourner tt le dataset (si vs avez du vrai Big Data)

[Row(ID=11727746, Case Number='JC312349', Date='06/18/2019 11:55:00 PM', Block='0000X W 79TH ST', IUCR='0484', Primary Type='BATTERY', Description='PRO EMP HANDS NO/MIN INJURY', Location Description='CTA STATION', Arrest=False, Domestic=False, Beat=623, District=6, Ward=6, Community Area=44, FBI Code='08B', X Coordinate=None, Y Coordinate=None, Year=2019, Updated On='06/25/2019 04:19:57 PM', Latitude=None, Longitude=None, Location=None, Historical Wards 2003-2015=None, Zip Codes=None, Community Areas=None, Census Tracts=None, Wards=None, Boundaries - ZIP Codes=None, Police Districts=None, Police Beats=None),
 Row(ID=11728171, Case Number='JC312895', Date='06/18/2019 11:55:00 PM', Block='044XX S CHRISTIANA AVE', IUCR='1320', Primary Type='CRIMINAL DAMAGE', Description='TO VEHICLE', Location Description='STREET', Arrest=False, Domestic=False, Beat=821, District=8, Ward=14, Community Area=58, FBI Code='14', X Coordinate=None, Y Coordinate=None, Year=2019, Updated On='06/25/2019 04:19:57 P

In [13]:
# df.tail(5) 
import pandas as pd
pd.DataFrame(df.tail(5), columns=df.columns)

                                                                                

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
0,1403263,G117094,01/01/2001 12:00:00 AM,084XX S PRAIRIE AV,1120,DECEPTIVE PRACTICE,FORGERY,BANK,False,False,...,-87.617741,"(41.741163326, -87.617740827)",31,21546,40,406,32,61,20,241
1,1352564,G055473,01/01/2001 12:00:00 AM,018XX N TALMAN AV,820,THEFT,$500 AND UNDER,STREET,False,False,...,-87.693605,"(41.914955409, -87.693605188)",24,22535,23,180,41,1,7,171
2,1313893,G000035,01/01/2001 12:00:00 AM,027XX W NELSON ST,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,False,False,...,-87.697098,"(41.936549915, -87.697097823)",24,21538,22,467,20,39,7,174
3,1374123,G083329,01/01/2001 12:00:00 AM,045XX S LAPORTE AV,820,THEFT,$500 AND UNDER,STREET,False,False,...,-87.747061,"(41.810679064, -87.747061323)",35,22268,53,604,28,7,13,112
4,1329588,G025242,01/01/2001 12:00:00 AM,061XX S COTTAGE GROVE AV,820,THEFT,$500 AND UNDER,STREET,False,False,...,-87.606155,"(41.783961993, -87.606155349)",53,22260,9,471,4,60,18,275


# shape

In [17]:
# df.columns
# type(df.columns) #  liste
# len(df.columns)
df.count()   # nb de ligne : 6899950

                                                                                

6899950

# [cache or persist](https://stackoverflow.com/questions/26870537/what-is-the-difference-between-cache-and-persist?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa) ? 

* Comme ns allons réaliser de nombreuses opé sur le `df`, il vaut mieux le mettre en `cache` ce qui permettra de gagner du temps de calcul. Le `df` sera stocké en mémoire vive
* The difference between `cache` and `persist` is purely syntactic. `cache` is a synonym of `persist` or `persist(MEMORY_ONLY)`. But Persist() We can save the intermediate results in 5 storage levels : `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, `MEMORY_AND_DISK_SER`, `DISK_ONLY`

In [18]:
df.cache()

DataFrame[ID: int, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: boolean, Domestic: boolean, Beat: int, District: int, Ward: int, Community Area: int, FBI Code: string, X Coordinate: int, Y Coordinate: int, Year: int, Updated On: string, Latitude: double, Longitude: double, Location: string, Historical Wards 2003-2015: int, Zip Codes: int, Community Areas: int, Census Tracts: int, Wards: int, Boundaries - ZIP Codes: int, Police Districts: int, Police Beats: int]

# [describe](https://databricks.com/fr/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html) & summary

* L'opé suivante va mettre approx 3 min et réaliser 14 `tasks` !
* Que signifie `stage` dans Spark ?  : [sackoverflow](https://stackoverflow.com/questions/32994980/what-does-stage-mean-in-the-spark-logs)
* Que signifie tasks  : [mallikarjuna](https://mallikarjuna_g.gitbooks.io/spark/content/spark-taskscheduler-tasks.html), [stackoverflow](https://stackoverflow.com/questions/25276409/what-is-a-task-in-spark-how-does-the-spark-worker-execute-the-jar-file)

In [19]:
df_desc = df.describe()
df_desc
# df_desc.show()
# df_desc = df.describe().toPandas()
# df_desc.set_index('summary').T 


                                                                                

DataFrame[summary: string, ID: string, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Beat: string, District: string, Ward: string, Community Area: string, FBI Code: string, X Coordinate: string, Y Coordinate: string, Year: string, Updated On: string, Latitude: string, Longitude: string, Location: string, Historical Wards 2003-2015: string, Zip Codes: string, Community Areas: string, Census Tracts: string, Wards: string, Boundaries - ZIP Codes: string, Police Districts: string, Police Beats: string]

* L'opé suivante va mettre approx 7 min et réaliser 14 `tasks` !

In [36]:
df.summary()

DataFrame[summary: string, ID: string, Case Number: string, Date: string, Block: string, IUCR: string, Primary Type: string, Description: string, Location Description: string, Arrest: string, Domestic: string, Beat: string, District: string, Ward: string, Community Area: string, FBI Code: string, X Coordinate: string, Y Coordinate: string, Year: string, Updated On: string, Latitude: string, Longitude: string, Location: string, Historical Wards 2003-2015: string, Zip Codes: string, Community Areas: string, Census Tracts: string, Wards: string, Boundaries - ZIP Codes: string, Police Districts: string, Police Beats: string]

# Missing values : [sparkByExamples](https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/), [stackoverflow](https://stackoverflow.com/questions/44413132/count-the-number-of-missing-values-in-a-dataframe-spark)

In [51]:
# df.summary('count').toPandas().T

# [{c:df.filter(df[c].isNull()).count()} for c in df.columns]

from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]   ).show()
# df_na.T.sort_values(by = 0, ascending=False)/df.count() * 100

Unnamed: 0,summary,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
0,count,6899950,6899946,6899950,6899950,6899950,6899950,6899950,6894787,6899950,...,6811819,6811819,6792027,6811819,6794741,6796898,6794853,6794788,6795817,6795840


# Unique values : [SparkByExample](https://sparkbyexamples.com/pyspark/pyspark-distinct-to-drop-duplicates/), [stackoverflow](https://stackoverflow.com/questions/39383557/show-distinct-column-values-in-pyspark-dataframe)

In [64]:
df_unique = df.select('*').distinct()
# .show(10, truncate=False)


[{c:df.select(c).distinct().count()} for c in df.columns]

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
0,6899950.0,,,,,,,,,,...,,,,,,,,,,
1,,6899536.0,,,,,,,,,...,,,,,,,,,,
2,,,2773693.0,,,,,,,,...,,,,,,,,,,
3,,,,60331.0,,,,,,,...,,,,,,,,,,
4,,,,,402.0,,,,,,...,,,,,,,,,,
5,,,,,,35.0,,,,,...,,,,,,,,,,
6,,,,,,,380.0,,,,...,,,,,,,,,,
7,,,,,,,,180.0,,,...,,,,,,,,,,
8,,,,,,,,,2.0,,...,,,,,,,,,,
9,,,,,,,,,,2.0,...,,,,,,,,,,


# [value_counts](https://napsterinblue.github.io/notes/spark/sparksql/value_counts/)   [stackoverflow](https://stackoverflow.com/questions/51063624/whats-the-equivalent-of-pandas-value-counts-in-pyspark)

In [88]:
# import pyspark.sql.functions as F     

import pyspark.sql.functions as func
count_cl = df.count()
df \
.groupBy('Primary Type') \
.count() \
.withColumn('%', func.round((func.col('count')/count_cl)*100,2)) \
.orderBy('count', ascending=False)

df_value_counts.show(5)

df.groupby('category').agg(
    (F.count('Primary Type')).alias('count'),
    (F.count('Primary Type') / df.count()).alias('percentage')
)


22/01/08 01:39:27 WARN memory.MemoryStore: Not enough space to cache rdd_19_3 in memory! (computed 29.2 MiB so far)
22/01/08 01:39:27 WARN memory.MemoryStore: Not enough space to cache rdd_19_2 in memory! (computed 29.7 MiB so far)
22/01/08 01:39:27 WARN memory.MemoryStore: Not enough space to cache rdd_19_4 in memory! (computed 29.4 MiB so far)
22/01/08 01:39:27 WARN memory.MemoryStore: Not enough space to cache rdd_19_5 in memory! (computed 29.1 MiB so far)
22/01/08 01:39:28 WARN memory.MemoryStore: Not enough space to cache rdd_19_6 in memory! (computed 29.4 MiB so far)
22/01/08 01:39:28 WARN memory.MemoryStore: Not enough space to cache rdd_19_7 in memory! (computed 29.6 MiB so far)
22/01/08 01:39:28 WARN memory.MemoryStore: Not enough space to cache rdd_19_8 in memory! (computed 29.6 MiB so far)
22/01/08 01:39:28 WARN memory.MemoryStore: Not enough space to cache rdd_19_9 in memory! (computed 29.6 MiB so far)
22/01/08 01:39:28 WARN memory.MemoryStore: Not enough space to cache rdd

+---------------+-------+
|   Primary Type|  count|
+---------------+-------+
|          THEFT|1453743|
|        BATTERY|1260831|
|CRIMINAL DAMAGE| 786698|
|      NARCOTICS| 719559|
|        ASSAULT| 430537|
+---------------+-------+
only showing top 5 rows



22/01/08 17:06:36 WARN spark.HeartbeatReceiver: Removing executor driver with no recent heartbeats: 45942743 ms exceeds timeout 120000 ms
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.NettyRpcEnv: Ignored message: true
22/01/08 17:06:36 WARN netty.N

# Working with columns

**Q : Accéder à la col `District` et `Case Number`**

In [79]:
# df.columns

## Accés à une seule col  
# df.District, type(_)
# df['Case Number'], type(_)

## L'ensemble de ces écritures sont équivalentes 
# df.select('District')
# df.select('Case Number')
# df.select(df.District)
# df.select(F.col('District'))

## Accés à plusieurs col  
df.select(['District', 'Case Number'])
df.select('District', 'Case Number')

District,Case Number
6,JC312349
8,JC312895
8,JC312901
6,JC312360
17,JC312333
5,JC312367
24,JC312338
5,JC312327
7,JC312317
10,JC312602


**Q : Display only the first 5 rows of the column name IUCR**

In [62]:
df.select('IUCR').show(5)

+----+
|IUCR|
+----+
|0484|
|1320|
|1320|
|0470|
|0460|
+----+
only showing top 5 rows



**Q : Display only the first 4 rows of the column names Case Number, Date and Arrest**

In [90]:
df.select(['Case Number', 'Date', 'Arrest']).show(4)

+-----------+--------------------+------+
|Case Number|                Date|Arrest|
+-----------+--------------------+------+
|   JC312349|06/18/2019 11:55:...| false|
|   JC312895|06/18/2019 11:55:...| false|
|   JC312901|06/18/2019 11:55:...| false|
|   JC312360|06/18/2019 11:50:...|  true|
+-----------+--------------------+------+
only showing top 4 rows



**Q : Add a column with name One, with entries all 1s [SparkByExamples](https://sparkbyexamples.com/spark/spark-add-new-column-to-dataframe/)  [stackoverflow](https://stackoverflow.com/questions/55382401/how-to-add-multiple-empty-columns-to-a-pyspark-dataframe-at-specific-locations)**

In [76]:
# df = df.withColumn('One', F.lit(1))   # `lit` pour dire `litteral` 
# df.select('District', 'One').show(5)
# df
df.One.dtype

Column<'One[dtype]'>

**Q : Remove the column One Or multiple columns**  [SparkByExamples](https://sparkbyexamples.com/pyspark/pyspark-drop-column-from-dataframe/)

In [92]:
# df = df.drop('One')
# df

## drop multiple columns 
cols = ["ID", "Case Number", "Block"]
df.drop(*cols)

Date,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats
06/18/2019 11:55:...,0484,BATTERY,PRO EMP HANDS NO/...,CTA STATION,False,False,623,6,6,44,08B,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:55:...,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,821,8,14,58,14,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:55:...,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,821,8,14,58,14,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:50:...,0470,PUBLIC PEACE VIOL...,RECKLESS CONDUCT,STREET,True,False,624,6,6,69,24,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:50:...,0460,BATTERY,SIMPLE,BAR OR TAVERN,False,False,1731,17,30,16,08B,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:45:...,031A,ROBBERY,ARMED: HANDGUN,STREET,False,False,524,5,34,53,03,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:45:...,0910,MOTOR VEHICLE THEFT,AUTOMOBILE,STREET,False,False,2432,24,49,1,07,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:39:...,1320,CRIMINAL DAMAGE,TO VEHICLE,STREET,False,False,532,5,9,53,14,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:32:...,051A,ASSAULT,AGGRAVATED: HANDGUN,RESIDENCE PORCH/H...,False,False,714,7,15,67,04A,,,2019,06/25/2019 04:19:...,,,,,,,,,,,
06/18/2019 11:30:...,0810,THEFT,OVER $500,STREET,False,False,1024,10,22,30,06,,,2019,06/25/2019 04:19:...,,,,,,,,,,,


# Working with dates  [SparkByExamples](https://sparkbyexamples.com/pyspark/pyspark-to_date-convert-string-to-date-format/), [stackoverflow](https://stackoverflow.com/questions/38080748/convert-pyspark-string-to-date-format)

In [123]:
dt = spark.createDataFrame([('2019-12-25 13:30:00',)], ['Christmas'])
dt.show()
dt.dtypes


+-------------------+
|          Christmas|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+



[('Christmas', 'string')]

## **Q : Convertir la col `Christmas`  2019-12-25 13:30:00 en `date` et en `timestamp`**

In [126]:
# Le nom de la col est trop long, il faut mettre un alias (comme en SQL)
# dt.select(F.to_date(dt.Christmas, 'yyyy-MM-dd HH:mm:ss'))  
# dt.select(F.to_date(dt.Christmas, 'yyyy-MM-dd HH:mm:ss').alias('Christmas date'))

# On sélectionne les cols : `Christmas`, `Christmas date`, `Christmas timestamp`
# dt.select([
#     dt.Christmas, 
#     F.to_date(dt.Christmas, 'yyyy-MM-dd HH:mm:ss').alias('Christmas date'), 
#     F.to_timestamp(dt.Christmas, 'yyyy-MM-dd HH:mm:ss').alias('Christmas timestamp')
#     ])# .show(truncate=False)

# Si on souhaite convertir la   
dt.withColumn('Christmas date', F.to_date(dt.Christmas, 'yyyy-MM-dd HH:mm:ss')) \
    .withColumn('Christmas timestamp', F.to_timestamp(dt.Christmas, 'yyyy-MM-dd HH:mm:ss')) 

# dt
# dt.dtypes


Christmas,Christmas date,Christmas timestamp
2019-12-25 13:30:00,2019-12-25,2019-12-25 13:30:00


## **Q : Convertir la col `Christmas`  25/Dec/2019 13:30:00 en `date` et en `timestamp`**

In [127]:
dt = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['Christmas'])
dt.show()
dt.dtypes


+--------------------+
|           Christmas|
+--------------------+
|25/Dec/2019 13:30:00|
+--------------------+



[('Christmas', 'string')]

In [130]:
dt.withColumn('Christmas date', F.to_date(dt.Christmas, 'dd/MMM/yyyy HH:mm:ss')) \
    .withColumn('Christmas timestamp', F.to_timestamp(dt.Christmas, 'dd/MMM/yyyy HH:mm:ss')) 

Christmas,Christmas date,Christmas timestamp
25/Dec/2019 13:30:00,2019-12-25,2019-12-25 13:30:00


## **Q : Convertir la col `Christmas`  12/25/2019 01:30:00 PM en `date` et en `timestamp`  [stackoverflow](https://stackoverflow.com/questions/51680587/how-can-i-account-for-am-pm-in-string-to-datetime-conversion-in-pyspark)** 

In [141]:
# La spécificité ici est que la date comporte AM/PM
dt = spark.createDataFrame([('12/25/2019 01:30:00 PM', )], ['Christmas'])
dt.show(truncate=False)
dt.dtypes


+----------------------+
|Christmas             |
+----------------------+
|12/25/2019 01:30:00 PM|
+----------------------+



[('Christmas', 'string')]

## **Q : Convertir la col `Date` du `df` et trouver combien de crimes ont été commis `12-Nov-2018` ?** 

In [None]:
# df.select(df.Date).show(truncate=False)
# df.select(df.Date).dtypes # string

## Convertir la column `Date` ? 
# df = df.withColumn('Date_', F.to_timestamp(df.Date, 'MM/dd/yyy hh:mm:ss a'))

df.select(df.Date_).dtypes # timestamp

# df

In [None]:
# On exécute la Qry sur la col `Date` (string) pr trouver combien de jours qui corresp. à `2018-11-12`
# Att. la Qry va mettre 2min et retourne 0 obs  
# df.filter(df.Date == F.lit('2018-11-12'))

# On exécute maintenant la Qry sur la col `Date_` (timestamp)
# Att. la Qry va mettre 2min et retourne 3 obs  

df.filter(df.Date_ == F.lit('2018-11-12'))


22/01/17 21:48:13 WARN MemoryStore: Not enough space to cache rdd_21_5 in memory! (computed 21.0 MiB so far)
22/01/17 21:48:13 WARN MemoryStore: Not enough space to cache rdd_21_8 in memory! (computed 21.5 MiB so far)
22/01/17 21:48:13 WARN MemoryStore: Not enough space to cache rdd_21_6 in memory! (computed 21.3 MiB so far)
22/01/17 21:48:13 WARN MemoryStore: Not enough space to cache rdd_21_7 in memory! (computed 21.5 MiB so far)
22/01/17 21:48:18 WARN MemoryStore: Not enough space to cache rdd_21_9 in memory! (computed 21.5 MiB so far)
22/01/17 21:48:18 WARN MemoryStore: Not enough space to cache rdd_21_10 in memory! (computed 21.3 MiB so far)
22/01/17 21:48:18 WARN MemoryStore: Not enough space to cache rdd_21_11 in memory! (computed 21.3 MiB so far)
22/01/17 21:48:18 WARN MemoryStore: Not enough space to cache rdd_21_12 in memory! (computed 21.4 MiB so far)
22/01/17 21:48:29 WARN MemoryStore: Not enough space to cache rdd_21_6 in memory! (computed 21.3 MiB so far)
22/01/17 21:48:2

ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location,Historical Wards 2003-2015,Zip Codes,Community Areas,Census Tracts,Wards,Boundaries - ZIP Codes,Police Districts,Police Beats,Date_
11505149,JB513151,11/12/2018 12:00:...,003XX S WHIPPLE ST,810,THEFT,OVER $500,STREET,False,False,1124,11,28,27,6,1156099,1898319,2018,11/19/2018 04:22:...,41.876776356,-87.702317641,"(41.876776356, -8...",11,21184,28,737,23,28,16,123,2018-11-12 00:00:00
11516594,JB528186,11/12/2018 12:00:...,049XX S PRAIRIE AVE,2826,OTHER OFFENSE,HARASSMENT BY ELE...,OTHER,False,False,224,2,3,38,26,1178879,1872259,2018,11/28/2018 04:14:...,41.804775828,-87.619472488,"(41.804775828, -8...",12,21192,4,449,9,10,24,107,2018-11-12 00:00:00
11540042,JB559262,11/12/2018 12:00:...,010XX N DEARBORN ST,1140,DECEPTIVE PRACTICE,EMBEZZLEMENT,CONVENIENCE STORE,True,False,1824,18,2,8,12,1175747,1907348,2018,03/16/2019 04:01:...,41.901133376,-87.629904979,"(41.901133376, -8...",22,14926,37,230,11,54,14,197,2018-11-12 00:00:00


In [147]:
dt.withColumn('Christmas date', F.to_date(dt.Christmas, 'MM/dd/yyyy hh:mm:ss a')) \
  .withColumn('Christmas timestamp', F.to_timestamp(dt.Christmas, 'MM/dd/yyyy hh:mm:ss a')) 

Christmas,Christmas date,Christmas timestamp
12/25/2019 01:30:...,2019-12-25,2019-12-25 13:30:00


## **Q : What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

[('Date_', 'timestamp')]

# **Working with rows**

**Q : What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [32]:
# df.select('Arrest').distinct().show()
# df.select('Arrest').dtypes # boolean
(df.filter(df.Arrest == 'true').count() / df.select(df.Arrest).count())* 100

22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_1 in memory! (computed 21.5 MiB so far)
22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_5 in memory! (computed 1262.6 KiB so far)
22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_3 in memory! (computed 21.0 MiB so far)
22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_4 in memory! (computed 21.3 MiB so far)
22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_6 in memory! (computed 21.3 MiB so far)
22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_7 in memory! (computed 21.5 MiB so far)
22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_9 in memory! (computed 21.5 MiB so far)
22/01/17 22:16:10 WARN MemoryStore: Not enough space to cache rdd_21_10 in memory! (computed 21.3 MiB so far)
22/01/17 22:16:11 WARN MemoryStore: Not enough space to cache rdd_21_4 in memory! (computed 1279.3 KiB so far)
22/01/17 22:16

27.602098565931637

22/01/18 02:48:18 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 152756 ms exceeds timeout 120000 ms
22/01/18 02:48:18 WARN SparkContext: Killing executors is not supported by current scheduler.


# Challenge

**Q : What percentage of reported crimes resulted in an arrest?**

**Q : What are the top 3 locations for reported crimes?**

# Built-in functions

In [None]:
from pyspark.sql import functions

In [None]:
print(dir(functions))

## String functions

**Q : Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**

## Numeric functions


**Q : Show the oldest date and the most recent date**

# Working with joins

**Q : Download police station data**

**Q : The reported crimes dataset has only the district number. Add the district name by joining with the police station dataset**

# Challenge

**Q : What is the most frequently reported non-criminal activity?**

**Q : Using a bar chart, plot which day of the week has the most number of reported crime.**

# Lire à partir de HDFS
[SparkByExamples](https://sparkbyexamples.com/spark/spark-read-write-files-from-hdfs-txt-csv-avro-parquet-json/), [saggie](https://saagie.zendesk.com/hc/en-us/articles/360029759552-PySpark-Read-and-Write-Files-from-HDFS)

In [None]:
!hdfs dfs -ls -R /

drwxr-xr-x   - sayf supergroup          0 2021-09-09 16:35 /hadoop
-rw-r--r--   3 sayf supergroup  504941532 2021-09-09 16:35 /hadoop/access_log
drwxr-xr-x   - sayf supergroup          0 2021-12-24 02:11 /user
drwxr-xr-x   - sayf supergroup          0 2021-12-24 02:05 /user/sayf
drwxr-xr-x   - sayf supergroup          0 2021-12-24 02:05 /user/sayf/data
drwxr-xr-x   - sayf supergroup          0 2021-12-24 02:11 /user/spark


In [10]:
# Le sep est un espace
!hdfs dfs -cat /hadoop/access_log | more

10.223.157.186 - - [15/Jul/2009:14:58:59 -0700] "GET / HTTP/1.1" 403 202
10.223.157.186 - - [15/Jul/2009:14:58:59 -0700] "GET /favicon.ico HTTP/1.1" 404 
209
10.223.157.186 - - [15/Jul/2009:15:50:35 -0700] "GET / HTTP/1.1" 200 9157
10.223.157.186 - - [15/Jul/2009:15:50:35 -0700] "GET /assets/js/lowpro.js HTTP/1
.1" 200 10469
10.223.157.186 - - [15/Jul/2009:15:50:35 -0700] "GET /assets/css/reset.css HTTP/
1.1" 200 1014
10.223.157.186 - - [15/Jul/2009:15:50:35 -0700] "GET /assets/css/960.css HTTP/1.
1" 200 6206
10.223.157.186 - - [15/Jul/2009:15:50:35 -0700] "GET /assets/css/the-associates.
css HTTP/1.1" 200 15779
10.223.157.186 - - [15/Jul/2009:15:50:35 -0700] "GET /assets/js/the-associates.j
s HTTP/1.1" 200 4492
10.223.157.186 - - [15/Jul/2009:15:50:35 -0700] "GET /assets/js/lightbox.js HTTP
/1.1" 200 25960
10.223.157.186 - - [15/Jul/2009:15:50:36 -0700] "GET /assets/img/search-button.g
if HTTP/1.1" 200 168
10.223.157.186 - - [15/Jul/2009:15:50:36 -0700] "GET /assets/img/dummy/secondar

In [12]:
# df_log = spark.read.csv('hdfs://localhost:8020/hadoop/access_log', sep=' ')
df_log.show(truncate=False)

+--------------+---+---+---------------------+------+---------------------------------------------------------+---+-----+
|_c0           |_c1|_c2|_c3                  |_c4   |_c5                                                      |_c6|_c7  |
+--------------+---+---+---------------------+------+---------------------------------------------------------+---+-----+
|10.223.157.186|-  |-  |[15/Jul/2009:14:58:59|-0700]|GET / HTTP/1.1                                           |403|202  |
|10.223.157.186|-  |-  |[15/Jul/2009:14:58:59|-0700]|GET /favicon.ico HTTP/1.1                                |404|209  |
|10.223.157.186|-  |-  |[15/Jul/2009:15:50:35|-0700]|GET / HTTP/1.1                                           |200|9157 |
|10.223.157.186|-  |-  |[15/Jul/2009:15:50:35|-0700]|GET /assets/js/lowpro.js HTTP/1.1                        |200|10469|
|10.223.157.186|-  |-  |[15/Jul/2009:15:50:35|-0700]|GET /assets/css/reset.css HTTP/1.1                       |200|1014 |
|10.223.157.186|-  |-  |

# Ecrire dans HDFS

In [27]:
# Create data
test_write = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
test_write = spark.createDataFrame(test_write)

# Write into HDFS
test_write.write.csv("hdfs://localhost:8020/hadoop/test_write.csv", mode = 'overwrite')

                                                                                

In [28]:
!hdfs dfs -ls -R /

drwxr-xr-x   - sayf supergroup          0 2021-12-28 03:03 /hadoop
-rw-r--r--   3 sayf supergroup  504941532 2021-09-09 16:35 /hadoop/access_log
drwxr-xr-x   - sayf supergroup          0 2021-12-28 03:03 /hadoop/test_write.csv
-rw-r--r--   3 sayf supergroup          0 2021-12-28 03:03 /hadoop/test_write.csv/_SUCCESS
-rw-r--r--   3 sayf supergroup          8 2021-12-28 03:03 /hadoop/test_write.csv/part-00000-b613feb4-0e29-42e6-a2ec-fa6949d01a7d-c000.csv
-rw-r--r--   3 sayf supergroup          9 2021-12-28 03:03 /hadoop/test_write.csv/part-00001-b613feb4-0e29-42e6-a2ec-fa6949d01a7d-c000.csv
-rw-r--r--   3 sayf supergroup          8 2021-12-28 03:03 /hadoop/test_write.csv/part-00002-b613feb4-0e29-42e6-a2ec-fa6949d01a7d-c000.csv
-rw-r--r--   3 sayf supergroup         17 2021-12-28 03:03 /hadoop/test_write.csv/part-00003-b613feb4-0e29-42e6-a2ec-fa6949d01a7d-c000.csv
drwxr-xr-x   - sayf supergroup          0 2021-12-24 02:11 /user
drwxr-xr-x   - sayf supergroup          0 2021-12-24 02:05 /u