# Data processing with Apache Spark
### Student ID: []
### Student Name: []

### Apache Spark:
    Spark is an open source engine for large-scale data processing. Spark applications can be written by Scala, Python or java.
 <img src="https://spark.apache.org/images/spark-logo-trademark.png">

### Spark Basics:
The fundamental data unit in Spark is RDD (resilient distributed dataset):
- Spark core component.
- The Data structure that they use to store data in memory not in disk.
- It is like a distributed Array.

<img src="https://i0.wp.com/sparkbyexamples.com/wp-content/uploads/2020/08/rdd-creation.png?resize=1024%2C635&ssl=1">

- Resilient: Fault tolerant and can be recomputed when recovering from a failure
- Distributed: Processing takes place over several nodes in parallel.
- Dataset: Initial data can come from files, memory, or created programmatically

### Spark applications:
Are series of operations that transform input RDDs into output RDDs or final values

# Tasks
## Task 1: Building and submitting the first Spark application (words count):
In this example we are going to read the text in a given file and count the number of words in the input text.
- SparkContext: It's like the connection you make with the cluster or the link to submit a job
- SparkConf: specify your configurations for spark Context

In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName("Spark Lab words count Example")

In [None]:
sc = SparkContext(conf=conf)

In [None]:
lines = sc.textFile("/home/bitnami/Labs/Lab2/Data/AliceInWonderLandPart1.txt")
tokenized = lines.flatMap(lambda line: line.split(" "))

<img src="http://drive.google.com/uc?export=view&id=1TQgREgyPlZoPw69kl4HcE0xCO1rAaeMP">

In [None]:
tokenized.take(5)

['Alice', 'was', 'beginning', 'to', 'get']

In [None]:
wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)

In [None]:
list_words = wordCounts.collect()

In [None]:
list_words[0:5]

[('', 2), ('out', 3), ('on,', 1), ('was', 5), ('suddenly', 1)]

## Task 2: 
Modify the previous example to filter out words that occur less than N times:

In [None]:
countThreshold = 5
tokenized = lines.flatMap(lambda line: line.split(" "))
wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
Filtered_words = wordCounts.filter(lambda keyValuePair: keyValuePair[1] > countThreshold ).collect()
print(Filtered_words)

[('of', 8), ('her', 6), ('the', 12), ('it', 6), ('and', 6), ('a', 8), ('to', 7)]


In [None]:
sc.stop()

## Task 3: Modify task #1 so as to return the longest *line*:
In this task you will modify the task 1 as well. However, instead of counting the words you will return the longest line among the input lines.

In [None]:
# your code here

# Introduction to Spark SQL

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("Analyzing London crime data").getOrCreate()

### Reading external data as a dataframe

In [None]:
data = spark.read.format("csv")\
            .option("header", "true")\
            .load("london_crime_by_lsoa.csv")

In [None]:
data.printSchema()

root
 |-- lsoa_code: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- major_category: string (nullable = true)
 |-- minor_category: string (nullable = true)
 |-- value: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)



### Length of data

In [None]:
data.count()

1999

### Show Rows

In [None]:
data.limit(5).show()

+---------+----------+--------------------+--------------------+-----+----+-----+
|lsoa_code|   borough|      major_category|      minor_category|value|year|month|
+---------+----------+--------------------+--------------------+-----+----+-----+
|E01001116|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+---------+----------+--------------------+--------------------+-----+----+-----+



#### Cleaning data
* Drop rows which have (null) values
* Drop columns which we do not use in our analysis(lsoa_code column)

In [None]:
data = data.dropna()
data.columns

['lsoa_code',
 'borough',
 'major_category',
 'minor_category',
 'value',
 'year',
 'month']

In [None]:
data = data.drop('lsoa_code')
data.show(5)

+----------+--------------------+--------------------+-----+----+-----+
|   borough|      major_category|      minor_category|value|year|month|
+----------+--------------------+--------------------+-----+----+-----+
|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+----------+--------------------+--------------------+-----+----+-----+
only showing top 5 rows



### Select the unique boroughs we have in borough column

In [None]:
total_boroughs = data.select("borough").distinct()
total_boroughs.show()

+--------------------+
|             borough|
+--------------------+
|             Croydon|
|          Wandsworth|
|              Bexley|
|             Lambeth|
|Barking and Dagenham|
|              Camden|
|           Greenwich|
|              Newham|
|       Tower Hamlets|
|            Hounslow|
|              Barnet|
|              Harrow|
|Kensington and Ch...|
|           Islington|
|               Brent|
|            Haringey|
|             Bromley|
|              Merton|
|         Westminster|
|             Hackney|
+--------------------+
only showing top 20 rows



### Get all the data related to borough hackney only 

In [None]:
hackney_data = data.filter(data.borough == "Hackney")
hackney_data.show(5)

+-------+--------------------+--------------------+-----+----+-----+
|borough|      major_category|      minor_category|value|year|month|
+-------+--------------------+--------------------+-----+----+-----+
|Hackney|     Criminal Damage|Criminal Damage T...|    0|2011|    6|
|Hackney|Violence Against ...|          Harassment|    1|2013|    2|
|Hackney|     Criminal Damage|Other Criminal Da...|    0|2011|    7|
|Hackney|Violence Against ...|        Wounding/GBH|    0|2013|   12|
|Hackney|  Theft and Handling|  Other Theft Person|    0|2016|    8|
+-------+--------------------+--------------------+-----+----+-----+
only showing top 5 rows



## Get all the data related to year 2014 and above

In [None]:
# Your code here


### Get all the data related to year 2015 and 2016

In [None]:
data_2015_2016 = data[ (data.year== "2015") | (data.year== "2016")]
data_2015_2016.limit(5).show()

+---------+--------------------+--------------------+-----+----+-----+
|  borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+-----+----+-----+
|  Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|  Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|   Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
+---------+--------------------+--------------------+-----+----+-----+



## Using **isin** method

In [None]:
data_2015_2016 = data[# your code here]
data_2015_2016.limit(5).show()