# bigdata-LA1 tutorial

## 1. pyspark

You can setup the environment by ```pip install pyspark```
or follow the official document here
[Download Apache Spark™](https://spark.apache.org/downloads.html)

In [None]:
!pip install pyspark

## 2. Library

### 2.1 findspark

In [None]:
!pip install findspark

In [None]:
import findspark
findspark.init()
import pyspark

try:
    sc = pyspark.SparkContext('local','bigdata-LA1')
except ValueError:
    sc.stop()
    sc = pyspark.SparkContext('local','bigdata-LA1')
print("spark version:", sc.version)

## 3. Spark RDD API
 The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it.

[Spark Python API Docs](https://spark.apache.org/docs/latest/api/python/index.html)

### 3.1 RDD
There are two ways to create RDDs: parallelizing an existing **collection** in your driver program, or referencing a dataset in an **external storage system**, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

#### parallelizing from collection

In [None]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.count()

#### referencing from a file

In [None]:
# get trees treated file
!curl http://donnees.ville.montreal.qc.ca/dataset/ebb813dd-a93f-4fb0-8137-80492a30a1fa/resource/0a5984e4-752f-401e-b2d9-aa0567535d39/download/frenepublicinjection2016.csv -o frenepublicinjection2016.csv 

In [None]:
rdd = sc.textFile("frenepublicinjection2016.csv")
count = rdd.map(lambda x: 1).reduce(lambda a, b: a + b)
print(count)

### 3.2 RDD Operations
RDDs support two types of operations: **transformations**, which create a new dataset from an existing one, and **actions**, which return a value to the driver program after running a computation on the dataset.

In [None]:
# map() reduceByKey() collect()
from operator import add
data = [1, 2, 3, 4, 5, 5, 3, 1]

distData = sc.parallelize(data)
newData = distData.map(lambda x: (x,1)).reduceByKey(add)
print(newData.collect())

In [None]:
# filter()
newData = distData.filter(lambda x: x != 5)
print(newData.collect())
# distinct
newData = newData.distinct()
print(newData.collect())
# intersection
newData = newData.intersection(distData)
print(newData.collect())
# reduce
print(newData.reduce(add))

In [None]:
print(rdd.first())
print(rdd.take(2))

In [None]:
sc.stop()

[learn more](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

## 4. Spark DataFrame API

### 4.1 SparkSession

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("bigdata-LA1") \
    .getOrCreate()
print("spark version:", spark.version)

### 4.2 Creating DataFrames

In [None]:
df = spark.read.load("frenepublicinjection2016.csv",
                     format="csv", sep=",", inferSchema="true", header="true", quote='"')

print(df.count())

#### Untyped Dataset Operations

In [None]:
# printSchema
df.printSchema()

In [None]:
# filter
new_df = df.filter(df[6] != '')
print(new_df.count())

In [None]:
# select distinct
df_select = new_df.select(new_df[6])
df_select.show(10)
df_select.distinct().show(10)

In [None]:
# groupBy
df_with_count = new_df.groupBy(new_df[0]).count()
df_with_count.show(10)

In [None]:
# sort
df_select.sort(df_select.Nom_parc).show(10)

In [None]:
# intersect
df_select.intersect(spark.createDataFrame(df_select.take(5))).show()