# PySpark basics
Based on [this post](https://medium.com/@MariumFaheem/big-data-with-pyspark-58e7ee2b1299).

Data can be downloaded from [here](https://storage.googleapis.com/kaggle-data-sets/796863/1367247/compressed/stocks_price_final.csv.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20210910%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20210910T233450Z&X-Goog-Expires=259199&X-Goog-SignedHeaders=host&X-Goog-Signature=38e568168d5241628b57063e5590250073b5a8ad0f4f4a034ec76d6ef32d3ecde38434946f23318c15afd10427810a6d9f7ec0a3d0c3da62c4b0a2aebaf105bab1163a6f81a38802967ba6d439c59e62fc2e2e7779de602d73154b70482d86aa866806ba830c30af47025418536f25488030a880ba68e3fedb0c0c94d1828695a1fdef3b0e51c3c5f3daaf0f7f0c97cfc341f6a162af009fb7fda67e9294703f21a1b0c39d95faee4cc20c1d7c555a4ff5937f0779414e8303ef8894d95fcb25d34573ab0e6df3d89ccb11e0b18f87ea7127120d972f57672393112a47ff817525eeb2ae08a52b85a5cb8d672b006594ecb7cb52dc06fdc29702ad3972274ed6).

In [1]:
!pip install pyspark



In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

### Connecting to Spark cluster

In [5]:
sc = SparkContext("spark://carlos:7077", appName = "pyspark-basics")

In [6]:
spark = SparkSession.builder\
        .master("spark://localhost:7078")\
        .appName("pyspark-basics")\
        .getOrCreate()

### Loading different types of data

In [7]:
# Cretating a paralelized list in the Spark cluster
rdd_list = sc.parallelize([1, 2, 3, 4, 5])

In [8]:
# Getting the list from cluster
rdd_list.collect()

[1, 2, 3, 4, 5]

In [9]:
# What is the data type?
type(rdd_list)

pyspark.rdd.RDD

In [10]:
# Loading a tuple list
rdd_pair = sc.parallelize([
    ("jackets", 57),
    ("shirts", 33),
    ("jeans", 23),
    ("shirts", 23),
    ("jeans", 23),
    ("jeans", 13),
    ("jackets", 40),
])

In [11]:
# Printing the first record
rdd_pair.first()

('jackets', 57)

In [12]:
# Loading a text file
rdd_text = sc.textFile("./data/poem.txt")

In [13]:
# Printing the first 5 records
rdd_text.take(5)

['Una vez, al filo de una lúgubre media noche,',
 'mientras débil y cansado, en tristes reflexiones embebido,',
 'inclinado sobre un viejo y raro libro de olvidada ciencia,',
 'cabeceando, casi dormido,',
 'oyóse de súbito un leve golpe,']

In [15]:
# Loading a csv
rdd_csv = spark.read.csv("./data/stocks_price_final.csv", sep = ",", header = True)

In [16]:
# Counting number of records
rdd_csv.count()

1729034

In [17]:
# Printing columns and data types identified
rdd_csv.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



### RDD operations

In [18]:
# Using map for multiplying each record
rdd_list_x2 = rdd_list.map(lambda x: x * 2)
rdd_list_x2.collect()

[2, 4, 6, 8, 10]

In [19]:
# Using filter for getting a subset of the previously modified RDD
rdd_list_x2.filter(lambda x: x > 5).collect()

[6, 8, 10]

In [20]:
# Using map for splitting words from text
# By default, the result is something like a list of lists
rdd_text.map(lambda x: x.split(" ")).take(2)

[['Una', 'vez,', 'al', 'filo', 'de', 'una', 'lúgubre', 'media', 'noche,'],
 ['mientras',
  'débil',
  'y',
  'cansado,',
  'en',
  'tristes',
  'reflexiones',
  'embebido,']]

In [21]:
# What if I want to get a flat structure => Use flatMap function
rdd_text.flatMap(lambda x: x.split(" ")).take(15)

['Una',
 'vez,',
 'al',
 'filo',
 'de',
 'una',
 'lúgubre',
 'media',
 'noche,',
 'mientras',
 'débil',
 'y',
 'cansado,',
 'en',
 'tristes']

In [22]:
# Using reduceByKey for grouping by key and aggregating by value
rdd_pair_red = rdd_pair.reduceByKey(lambda x, y: x + y)
rdd_pair_red.collect()

[('jackets', 97), ('shirts', 56), ('jeans', 59)]

In [23]:
# What if I want the result sorted by key
rdd_pair_red.sortByKey(ascending = True).collect()

[('jackets', 97), ('jeans', 59), ('shirts', 56)]

### Putting all together

In [24]:
# Extending wordcount MapReduce algorithm for showing top 10 frecuent words 
rdd_text.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .map(lambda x: (x[1], x[0])) \
    .sortByKey(ascending = False) \
    .map(lambda x: (x[1], x[0])) \
    .take(10)

[('de', 61),
 ('el', 44),
 ('en', 35),
 ('la', 31),
 ('a', 27),
 ('y', 26),
 ('que', 23),
 ('mi', 21),
 ('', 18),
 ('un', 18)]

### Stoping Spark context and session

In [26]:
spark.stop()
sc.stop()