# Introduction

Spark is a tool for data processing. It can be used as a data storage, but it is not its main capability.

Spark is a great tool for Big Data for its clustering processing method. Data processed by Spark enables partitioning, scalability, etc. Another great feature of this framework is the duplication of data along the cluster. This makes data available even if one node of the cluster fails to continue processing (dies or something XP)

Looking further into partitioning, is the process of separating data into particular groups. Groups that has its unique processing executors. This is a great way to reduce reprocessing of data.

We will learn PySpark, one possible language of Spark.

## Components

* Machine Learning (Mlib)
* SQL (Spark SQL)
* Streaming processing
* Graphs processing

## Structure

* **Driver**: initialize SparkSession and acquire computational resources of the Cluster Manager; transforms operation into DAGs (Directed Acyclic Graphs); distribute operations throughout executors.

* **Manager**: manage cluster's resorces. We have four different managers, built-in, YARN, Mesos and Kubernetes.

* **Executer**: Runs throughout each node executing tasks

## Transformations and Actions

The dataframe is the basic unit of Spark. They are immutable, characteristic that bring failed tolerance.
When we execute process in Spark we have two basic operation: Transformations and Actions.

Transformations generate a new df. And, the processing of a transformation only occurs once an Action happens (Lazy Evaluation),

![Lazy Evaluation](images/Lazy.png "Lazy Evaluation")

We have two main types of transformations: Narrow and Wide. they indicate if the transform uses data from the same (Narrow) or different (Wide) partitions

## Components

* Job
* Stage
* Task

![Spark Components](images/Contents.png "Spark Components")

## Big Data Formats

Modern data formats are open to every capable framework to read. Throughout this course, we will use **parquet** files. These data formats are decoupled from the reading tools. They are also binary and compressed files. Moreover, they support schemas, are passive to clustering and partitioning

## Installation and initial configuration

To install Spark, one must go to their website and copy the download link. Then, simply copy the link into the terminal with the **wget** command. After this, you must move the extracted folder into the **opt** folder and add the required environmental variables to the **~/.bashrc** file.

Once the installation and variables are setup, use the following url on the browser to validate: **http://localhost:8080/**

Finnaly, to access Spark through the terminal, run the following commands:
* start-master.sh
* /opt/spark/sbin/start-slave.sh spark://localhost:7077

Now, one can access the Spark shell (python language) via **pyspark** command. For this course, you must also install **numpy** and **pandas**

*P.S.: For PySpark only, just use **pip install pyspark** ʕ•ᴥ•ʔ*

![Spark Successfull Install](images/install_success.png "Spark Successfull Installation")

For a more in depth reading, see: https://www.bmc.com/blogs/jupyter-notebooks-apache-spark/

# Data Structures

The Spark framework can interpret three data structures: **RDD - Resilien Distributed Datasets**; **Datasets**; **DataFrames**.

RDD are the most basic structure that Spark can process, and they normaly are:

* Low level basic data structure
* Complex and wordy - one might need a lot of code to process RDD
* Not optimized for Spark

DataFrames and Dataset are easier to manipulate. We already know their tabular structure, however, Datasets are not available for PySpark

## RDD 

One might create an **RDD** on shell by calling the method **sc.parallelize**, which takes as an argument a list.
Ex: ***nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])***

Another way to create the said object is via a hardcoded inicialization of a PySpark Session. The next kernel has such code. 

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('SparkByExamples.com').getOrCreate()

#Create RDD from parallelize    
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd = spark.sparkContext.parallelize(data)

22/03/22 18:27:20 WARN Utils: Your hostname, corbanez-H110M-H resolves to a loopback address: 127.0.1.1; using 192.168.0.131 instead (on interface enp2s0)
22/03/22 18:27:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/22 18:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


This object has a variaety of methods, such as **take**, **top**, **colect**, **sum**, **mean**, **stdev**, etc.

Moreover, using python lambda functions one can apply transformations - such as **filter** and **map**- on the **RDD**, then, perform an action to get the result.

In [2]:
print(f'Take method: {rdd.take(5)}')

print(f'Count method: {rdd.count()}')

print(f'Standard deviation method: {rdd.stdev()}')

#Filter using lambda functions
rdd_filtered = rdd.filter(lambda rdd_filtered: rdd_filtered > 4) 
print(f'Collecting filtered RDD: {rdd_filtered.collect()}')
#Collect method to collect data. Not a great idea for actual bigdata ｡◕‿◕｡

#Mapping using lambda functions
rdd_mapped = rdd.map(lambda rdd_mapped: rdd_mapped*3)
print(f'Collecting mapped RDD: {rdd_mapped.collect()}')

                                                                                

Take method: [1, 2, 3, 4, 5]
Count method: 12
Standard deviation method: 3.452052529534663
Collecting filtered RDD: [5, 6, 7, 8, 9, 10, 11, 12]
Collecting mapped RDD: [3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36]


With two **RDDs**, one can manipulate them with similar methods from mathematical sets. We can perform **union**, **intersection**, **subtract**, **cartesian**, etc.

In [3]:
data2 = [10, 11, 12, 13, 14, 15]
rdd2 = spark.sparkContext.parallelize(data2)

union = rdd.union(rdd2)
print(f'Union of both RDDs: {union.collect()}')
      
inter = rdd.intersection(rdd2)
print(f'Intersection of both RDDs: {inter.collect()}')

sub = rdd.subtract(rdd2)
print(f'Subtraction of both RDDs: {sub.collect()}')

cartesian_prod = rdd.cartesian(rdd2)
print(f'Cartesian product of both RDDs: {cartesian_prod.collect()}')

Union of both RDDs: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 10, 11, 12, 13, 14, 15]
Intersection of both RDDs: [10, 12, 11]
Subtraction of both RDDs: [2, 4, 6, 8, 1, 3, 5, 7, 9]
Cartesian product of both RDDs: [(1, 10), (1, 11), (1, 12), (1, 13), (1, 14), (1, 15), (2, 10), (2, 11), (2, 12), (2, 13), (2, 14), (2, 15), (3, 10), (3, 11), (3, 12), (3, 13), (3, 14), (3, 15), (4, 10), (4, 11), (4, 12), (4, 13), (4, 14), (4, 15), (5, 10), (5, 11), (5, 12), (5, 13), (5, 14), (5, 15), (6, 10), (6, 11), (6, 12), (6, 13), (6, 14), (6, 15), (7, 10), (7, 11), (7, 12), (7, 13), (7, 14), (7, 15), (8, 10), (8, 11), (8, 12), (8, 13), (8, 14), (8, 15), (9, 10), (9, 11), (9, 12), (9, 13), (9, 14), (9, 15), (10, 10), (10, 11), (10, 12), (10, 13), (10, 14), (10, 15), (11, 10), (11, 11), (11, 12), (11, 13), (11, 14), (11, 15), (12, 10), (12, 11), (12, 12), (12, 13), (12, 14), (12, 15)]


Now let us take a look at an example. Say the previous cartesian product is the sales registry of a given store. The first entry of each tuple is the customer code, and the second entry is the number of cucumbers they bought in our store. (◕‿◕✿)

We can extract **keys** and **values** from our registry, and create different **RDDs** to store them.

In [4]:
cucumbaLTDA_registry = cartesian_prod

customers = cucumbaLTDA_registry.keys().distinct()
print(f'Distinct customers that bought cucumbers on our store: {customers.collect()}')

total_value = cucumbaLTDA_registry.values().sum()
print(f'Total number of cucumber bought: {total_value}.\nOH YEAH! That\'s a lot of CUCUMBAS! (づ￣ ³￣)づ')

#This reincidence count is broken because we've created our registry via a cartesian product (ಥ﹏ಥ)
print(f'Count how many times each customer passed by our store: {cucumbaLTDA_registry.countByKey()}')

Distinct customers that bought cucumbers on our store: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Total number of cucumber bought: 900.
OH YEAH! That's a lot of CUCUMBAS! (づ￣ ³￣)づ
Count how many times each customer passed by our store: defaultdict(<class 'int'>, {1: 6, 2: 6, 3: 6, 4: 6, 5: 6, 6: 6, 7: 6, 8: 6, 9: 6, 10: 6, 11: 6, 12: 6})


Let's say that we also have a registry of debts for some customers. We may create a **join RDD**, where we have the registry of purchase while maintaining the number of unpaid cucumbers... 

In [5]:
debts = [(1,3),(5,10),(10,2)]
unpaid_cucumbers = spark.sparkContext.parallelize(debts)

comp_registry = cucumbaLTDA_registry.join(unpaid_cucumbers)
print(f'The complete registry obtained by the join of our sales registry and our debts is:\n {comp_registry.collect()}')

purchase_by_no_debt = cucumbaLTDA_registry.subtractByKey(unpaid_cucumbers)
print(f'Purchases made by clients without debts: {purchase_by_no_debt.collect()}')

The complete registry obtained by the join of our sales registry and our debts is:
 [(10, (10, 2)), (10, (11, 2)), (10, (12, 2)), (10, (13, 2)), (10, (14, 2)), (10, (15, 2)), (1, (10, 3)), (1, (11, 3)), (1, (12, 3)), (1, (13, 3)), (1, (14, 3)), (1, (15, 3)), (5, (10, 10)), (5, (11, 10)), (5, (12, 10)), (5, (13, 10)), (5, (14, 10)), (5, (15, 10))]
Purchases made by clients without debts: [(2, 10), (2, 11), (2, 12), (2, 13), (2, 14), (2, 15), (4, 10), (4, 11), (4, 12), (4, 13), (4, 14), (4, 15), (6, 10), (6, 11), (6, 12), (6, 13), (6, 14), (6, 15), (8, 10), (8, 11), (8, 12), (8, 13), (8, 14), (8, 15), (12, 10), (12, 11), (12, 12), (12, 13), (12, 14), (12, 15), (3, 10), (3, 11), (3, 12), (3, 13), (3, 14), (3, 15), (7, 10), (7, 11), (7, 12), (7, 13), (7, 14), (7, 15), (9, 10), (9, 11), (9, 12), (9, 13), (9, 14), (9, 15), (11, 10), (11, 11), (11, 12), (11, 13), (11, 14), (11, 15)]


## DataFrame

**DataFrames** are the data structure that we will focus on in this course. Some bullet points that are of great importance about **DataFrames**:

* Tabular structure. Excel-like data ☜(ﾟヮﾟ☜)
* Immutable
* Possess know schemas
* Preserved linage. Transformations are saved step by step
* Columns may have different d-types
* Common methods such as **group by**, **order by** and **filter**
* ***Extremely optimized on Spark***

Without further adieu, let's create a **DataFrame** and play with it. As you may see, to correctly create a **DF** one need to pass as arguments the required data and its schema.

In [6]:
data3 = [('John', 15),('Mary', 14),('James', 12)]

schema = "Name STRING, Age INT" #Model of a schema accepted by spark

df = spark.createDataFrame(data3, schema) #Here we pass the data and its schema

df.show()
df.show(1)

+-----+---+
| Name|Age|
+-----+---+
| John| 15|
| Mary| 14|
|James| 12|
+-----+---+

+----+---+
|Name|Age|
+----+---+
|John| 15|
+----+---+
only showing top 1 row



In [7]:
schema2 = "Product STRING, Quantity INT"
data4 = [('Pen', 9),('Pineappple', 22),('Apple', 12),('Pen', 13)]

df2 = spark.createDataFrame(data4, schema2)

df2.show()

+----------+--------+
|   Product|Quantity|
+----------+--------+
|       Pen|       9|
|Pineappple|      22|
|     Apple|      12|
|       Pen|      13|
+----------+--------+



To get useful information about our **DF**, one can call particular **DF's attributes**. For instance:

In [8]:
print(f'df2\'s schema:{df2.schema}') ##Will return the previously described schema

print(f'df2\'s columns:{df2.columns}') ##Will return those previously described columns

df2's schema:StructType(List(StructField(Product,StringType,true),StructField(Quantity,IntegerType,true)))
df2's columns:['Product', 'Quantity']


The new **DF** contains data from purchased products. One may need to see to total sum of a particular product, for instance. With this intention, we will use the method **groupBy** and **agg** (*agregate*). See below for a example on **agg** method for sumation and mean values.

In [9]:
from pyspark.sql.functions import sum, mean
df2.groupBy("Product").agg(sum("Quantity")).show()
df2.groupBy("Product").agg(mean("Quantity")).show()

+----------+-------------+
|   Product|sum(Quantity)|
+----------+-------------+
|       Pen|           22|
|     Apple|           12|
|Pineappple|           22|
+----------+-------------+

+----------+-------------+
|   Product|avg(Quantity)|
+----------+-------------+
|       Pen|         11.0|
|     Apple|         12.0|
|Pineappple|         22.0|
+----------+-------------+



We can use the method **select** to choose different columns, or even add a particular expression for a new column

In [10]:
from pyspark.sql.functions import expr
df2.select("Product", "Quantity", expr("Quantity - 4"), expr("Quantity * 0.5")).show()

+----------+--------+--------------+----------------+
|   Product|Quantity|(Quantity - 4)|(Quantity * 0.5)|
+----------+--------+--------------+----------------+
|       Pen|       9|             5|             4.5|
|Pineappple|      22|            18|            11.0|
|     Apple|      12|             8|             6.0|
|       Pen|      13|             9|             6.5|
+----------+--------+--------------+----------------+



It is easy to see that the module **pyspark.sql.functions** will be widely used throughout this course. For this reason, will be useful to know our available functions. See ***https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html*** for further reading.

### DataFrame Example

Let us take a look at an example file. In the folder ***Files***, there is a ***csv*** called **despachantes**. It contains data regarding sales made by sellers in a particular city and dates.

Firstly, we must load this file into our environment. To do so, we can use a specific **spark.read.csv**, or a generic **spark.read.load**.

In [11]:
from pyspark.sql.types import *

arqschema = "id INT, name STRING, status STRING, city STRING, sales INT, date STRING"

dispatchers = spark.read.csv('Files/despachantes.csv', header = False, schema = arqschema)

print('Load file with determined schema:')
dispatchers.show()
print(f'df shcema: {dispatchers.schema}\n')

dispatchers_autoschema = spark.read.load('Files/despachantes.csv', header = False, format = 'csv', sep = ',', inferSchema = True)

print('Load file with infered schema:')
dispatchers_autoschema.show()
print(f'df shcema: {dispatchers_autoschema.schema}')

Load file with determined schema:
+---+-------------------+------+-------------+-----+----------+
| id|               name|status|         city|sales|      date|
+---+-------------------+------+-------------+-----+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|   65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|   54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|    0|2020-09-05|
+---+-------------------+------+-------------+-----+----------+

df shcema: StructType(List(StructField(id,IntegerType,true),StructFie

Remember, we can filter data or apply any funtion from the module **pyspark.sql.functions**. For example, let us create a filtered **DF** where the amount of sales are greater than 20. For more complex filters, the logic operators **& (AND)** and **~ (OR)** are present in **Spark**. 


In [12]:
from pyspark.sql.functions import col

dispatchers_goal = dispatchers.select('id', 'name', 'sales').where(col('sales') > 20) 
#It is always good to stress that this df has not been calculated.
dispatchers_goal.show() #Only now, the calculation occurs! Lazy Evaluation ¯\_(ツ)_/¯

+---+-------------------+-----+
| id|               name|sales|
+---+-------------------+-----+
|  1|   Carminda Pestana|   23|
|  2|    Deolinda Vilela|   34|
|  3|   Emídio Dornelles|   34|
|  4|Felisbela Dornelles|   36|
|  6|   Matilde Rebouças|   22|
|  7|    Noêmia   Orriça|   45|
|  8|      Roque Vásquez|   65|
|  9|      Uriel Queiroz|   54|
+---+-------------------+-----+



Moreover, one may need to change a column name or a particular schema. Since **DF** are immutable objects from **Spark**, we must assing these changes as a new **DF**.

In [13]:
from pyspark.sql.functions import to_timestamp

dispatchers_v2 = dispatchers.withColumnRenamed('name', 'names').withColumn('date', to_timestamp(col('date'), 'yyyy-MM-dd'))
dispatchers_v2.show()
print(f'df shcema: {dispatchers_v2.schema}\n')

+---+-------------------+------+-------------+-----+-------------------+
| id|              names|status|         city|sales|               date|
+---+-------------------+------+-------------+-----+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05 00:00:00|
|  8|      Roque Vásquez| Ativo| Porto Alegre|   65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|   54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|    0|2020-09-05 00:00:00|
+---+-------------------+------+-------------+-----

So far, we've only used the **show** action. Now we will see some of the available actions from **Spark**

In [14]:
#Take a number of rows, and returns a list of them
kool_log_separator = '-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-'
print(f'First two rows from dispatchers_v2:\n {dispatchers_v2.take(2)}') 

print(kool_log_separator)

#Collect all rows in a list manner
print(f'\nEvery row from dispatchers_v2 in the form of a list:\n {dispatchers_v2.collect()}')

print(kool_log_separator)

print(f'\nCount the number of rows: {dispatchers_v2.count()}')

print(kool_log_separator)



First two rows from dispatchers_v2:
 [Row(id=1, names='Carminda Pestana', status='Ativo', city='Santa Maria', sales=23, date=datetime.datetime(2020, 8, 11, 0, 0)), Row(id=2, names='Deolinda Vilela', status='Ativo', city='Novo Hamburgo', sales=34, date=datetime.datetime(2020, 3, 5, 0, 0))]
-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-

Every row from dispatchers_v2 in the form of a list:
 [Row(id=1, names='Carminda Pestana', status='Ativo', city='Santa Maria', sales=23, date=datetime.datetime(2020, 8, 11, 0, 0)), Row(id=2, names='Deolinda Vilela', status='Ativo', city='Novo Hamburgo', sales=34, date=datetime.datetime(2020, 3, 5, 0, 0)), Row(id=3, names='Emídio Dornelles', status='Ativo', city='Porto Alegre', sales=34, date=datetime.datetime(2020, 2, 5, 0, 0)), Row(id=4, names='Felisbela Dornelles', status='Ativo', city='Porto Alegre', sales=36, date=datetime.datetime(2020, 2, 5, 0, 0)), Row(id=5, names='Graça Ornell

It is easy to see that transformations with **PySpark** is kind of clunky. For this reason, it is better to load our data with **Spark**, but transform after loading with **SQL scripts**. This is one of the main ideas from the **ELT** proccess.

Finnaly, we will write our new altered **DF**, **dispatchers_v2**, as a **.parquet** file. 

*P.S.: You can give different formats for the write action*

In [15]:
import pyspark.sql.utils
import os, sys

possible_formats = ['parquet', 'json', 'csv', 'orc']

file = 'dispatchers_v2'

for f in possible_formats:
    path = f'Exports/{f}_{file}'
    try:
        dispatchers_v2.write.format(f).save(path)
        dirr = os.listdir(path)
        for d in dirr:
            if '_SUCCESS' not in d and 'crc' not in d:
                print(f'File saved as {d}')
        
    except pyspark.sql.utils.AnalysisException:
        print('Unable to save data because of an AnalysisException.')
        dirr = os.listdir(path)
        for d in dirr:
            if '_SUCCESS' not in d and 'crc' not in d:
                print(f'File already saved as {d}')

Unable to save data because of an AnalysisException.
File already saved as part-00000-67e3502e-f992-4fbb-9140-ebd65f52c1de-c000.snappy.parquet
Unable to save data because of an AnalysisException.
File already saved as part-00000-1a93a3af-8e27-490f-afdd-c22f02c02418-c000.json
File already saved as .ipynb_checkpoints
Unable to save data because of an AnalysisException.
File already saved as part-00000-99518b3d-8038-46ee-b085-d1af7568696d-c000.csv
File already saved as .ipynb_checkpoints
Unable to save data because of an AnalysisException.
File already saved as part-00000-8c7fc7fe-8114-4521-a995-df592a40147d-c000.snappy.orc


These file can be loaded back into our **Spark** environment using **read** method.

In [16]:
random_formats_structure = {}

for f in possible_formats:
    path = f'Exports/{f}_{file}'
    dirr = os.listdir(path)
    for d in dirr:
            if '_SUCCESS' not in d and 'crc' not in d and 'ipynb' not in d:
                print(f'Loading {d}')
                try:
                    random_formats_structure[d] = spark.read.format(f).load(path + '/' + d)
                except pyspark.sql.utils.AnalysisException:
                    print('Unexpected Analysis Exception')       

Loading part-00000-67e3502e-f992-4fbb-9140-ebd65f52c1de-c000.snappy.parquet
Loading part-00000-1a93a3af-8e27-490f-afdd-c22f02c02418-c000.json
Loading part-00000-99518b3d-8038-46ee-b085-d1af7568696d-c000.csv
Loading part-00000-8c7fc7fe-8114-4521-a995-df592a40147d-c000.snappy.orc


In [17]:
for structure in random_formats_structure:
    print(structure)
    random_formats_structure[structure].show()
    print(kool_log_separator + '\n')

part-00000-67e3502e-f992-4fbb-9140-ebd65f52c1de-c000.snappy.parquet
+---+-------------------+------+-------------+-----+-------------------+
| id|              names|status|         city|sales|               date|
+---+-------------------+------+-------------+-----+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05 00:00:00|
|  8|      Roque Vásquez| Ativo| Porto Alegre|   65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|   54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|    0|2020

## Activity

On the folder **Atividades** you will find five **parquet** files. You are required to explore the data, and deliver queries with the following products:

* A query that shows three columns - Name, State and Status - with its respective data. They need to be in this particular order;

* A second query that shows only clients with **platinum** and **gold** Status;

* Calculate the percentage of sales regarding each Status.

It is not absolute that you need to use every file. You can see the relation between them in the table scheme below.

![Table Scheme](images/activity_schema.png "Table Scheme")

First of all, we need to import all required files. We can see which files we will need by analyzing the table schema above.

For the first question, we need the columns **Customer**, **Status**, and **Status**, therefore, we must load the data from the **Customers** table. We need only a filter of the previously created table for the second assignment. For the last assignment, we need to count the number of sales grouped by **Status**, hence, we must also load the **Sales** table.

Considering everything, we can load only two tables from our schema.

In [18]:
clients = spark.read.format('parquet').load('Files/Atividades/Clientes.parquet')

sales = spark.read.format('parquet').load('Files/Atividades/Vendas.parquet')

In [19]:
clients.show()

+---------+--------------------+------+------+--------+
|ClienteID|             Cliente|Estado|Genero|  Status|
+---------+--------------------+------+------+--------+
|        1|Adelina Buenaventura|    RJ|     M|  Silver|
|        2|        Adelino Gago|    RJ|     M|  Silver|
|        3|     Adolfo Patrício|    PE|     M|  Silver|
|        4|    Adriana Guedelha|    RO|     F|Platinum|
|        5|       Adélio Lisboa|    SE|     M|  Silver|
|        6|       Adérito Bahía|    MA|     M|  Silver|
|        7|       Aida Dorneles|    RN|     F|  Silver|
|        8|   Alarico Quinterno|    AC|     M|  Silver|
|        9|    Alberto Cezimbra|    AM|     M|  Silver|
|       10|    Alberto Monsanto|    RN|     M|    Gold|
|       11|       Albino Canela|    AC|     M|  Silver|
|       12|     Alceste Varanda|    RR|     F|  Silver|
|       13|  Alcides Carvalhais|    RO|     M|  Silver|
|       14|        Aldo Martins|    GO|     M|  Silver|
|       15|   Alexandra Tabares|    MG|     F|  

In [20]:
sales.show()

+--------+----------+---------+---------+--------+
|VendasID|VendedorID|ClienteID|     Data|   Total|
+--------+----------+---------+---------+--------+
|       1|         1|       91| 1/1/2019|  8053.6|
|       2|         6|      185| 1/1/2020|   150.4|
|       3|         7|       31| 2/1/2020|  6087.0|
|       4|         5|       31| 2/1/2019| 13828.6|
|       5|         5|       31| 3/1/2018|26096.66|
|       6|         5|       31| 4/1/2020| 18402.0|
|       7|         5|       31| 6/1/2019|  7524.2|
|       8|         5|      186| 6/1/2019| 12036.6|
|       9|         7|       91| 6/1/2020| 2804.75|
|      10|         2|      202| 6/1/2020|  8852.0|
|      11|         7|       58| 8/1/2019|16545.25|
|      12|         7|       58| 9/1/2018|11411.88|
|      13|         7|       58|10/1/2019| 15829.7|
|      14|         3|      249|12/1/2020| 6154.36|
|      15|         4|      249|12/1/2018| 3255.08|
|      16|         7|      192|13/1/2020| 2901.25|
|      17|         2|       79|

For the first requirement, we shall use a simple **select** to get a few columns from **clients**

In [21]:
small_df = clients.withColumnRenamed('Cliente', 'Name').withColumnRenamed('Estado', 'State').select('Name', 'State', 'Status')
small_df.show()

+--------------------+-----+--------+
|                Name|State|  Status|
+--------------------+-----+--------+
|Adelina Buenaventura|   RJ|  Silver|
|        Adelino Gago|   RJ|  Silver|
|     Adolfo Patrício|   PE|  Silver|
|    Adriana Guedelha|   RO|Platinum|
|       Adélio Lisboa|   SE|  Silver|
|       Adérito Bahía|   MA|  Silver|
|       Aida Dorneles|   RN|  Silver|
|   Alarico Quinterno|   AC|  Silver|
|    Alberto Cezimbra|   AM|  Silver|
|    Alberto Monsanto|   RN|    Gold|
|       Albino Canela|   AC|  Silver|
|     Alceste Varanda|   RR|  Silver|
|  Alcides Carvalhais|   RO|  Silver|
|        Aldo Martins|   GO|  Silver|
|   Alexandra Tabares|   MG|  Silver|
|      Alfredo Cotrim|   SC|  Silver|
|     Almeno Figueira|   SC|  Silver|
|      Alvito Peralta|   AM|  Silver|
|     Amadeu Martinho|   RN|  Silver|
|      Amélia Estévez|   PE|  Silver|
+--------------------+-----+--------+
only showing top 20 rows



Moreover, we were asked to filter the new **small_df** by **Status**, where **Status** equals **Gold** and **Platinum**

In [22]:
required_status = ['Gold', 'Platinum']

filtered_small_df = small_df.filter(small_df.Status.isin(required_status))
## As an alternative, you could use 
#select("*").where((Func.col("Status") == "Gold") |(Func.col("Status") == "Platinum"))
filtered_small_df.show()

+-------------------+-----+--------+
|               Name|State|  Status|
+-------------------+-----+--------+
|   Adriana Guedelha|   RO|Platinum|
|   Alberto Monsanto|   RN|    Gold|
|      Anna Carvajal|   RS|    Gold|
|      Bento Quintão|   SP|    Gold|
|      Carminda Dias|   AM|    Gold|
|      Cláudio Jorge|   TO|    Gold|
|    Dionísio Saltão|   PR|    Gold|
|   Firmino Meireles|   AM|    Gold|
|      Flor Vilanova|   CE|Platinum|
|Honorina Villaverde|   PE|    Gold|
|    Ibijara Botelho|   RR|Platinum|
|  Iracema Rodríguez|   BA|    Gold|
|         Joana Ataí|   GO|Platinum|
+-------------------+-----+--------+



The last assingment require us to count the number of sales grouped by each **Status**. We begin by joining our **clients** table with **sales** by the key **ClienteID**.

In [23]:
clients.join(sales, clients.ClienteID == sales.ClienteID, "left").select('Status', 'Total').groupby('Status').agg(sum('Total')).orderBy('Status').show()

+--------+----------+
|  Status|sum(Total)|
+--------+----------+
|    Gold|  27286.69|
|Platinum|  12584.68|
|  Silver|3014291.36|
+--------+----------+



# Spark SQL

**Spark SQL** uses **Metastore** from **Hive** to create databases, which will enable us to process high-level languages, such as a **Structured Query Language**. For this reason, **Spark SQL** will search data from tables on a database. These tables are persistent objects, i.e., they will remain on our database after the ending of our last **Spark Session**.

We must stress that **Tables** and **DataFrames** are interchangeable objects! One can easily create one from another.

Another useful entity from **Spark SQL** is **Views**. It is an object that does not store data, just an alias for a complex query.

Without delay, let us create a **Database** and a **Table** using **Spark SQL**!

## Databases

In [24]:
# from pyspark.sql import SparkSession
# from pyspark.sql.types import *

#     Both import above are required to create a Database and a Table. Firstly, we begin the SparkSession, 
#   then we we use different types to create and operate on our Database

spark.sql("show databases").show() #This command help us see actives Databases

#We can begin by creating a database (づ￣ ³￣)づ
try:
    spark.sql("create database desp") #One can easily notice that argument of the sql type, we pass a sql script!
except pyspark.sql.utils.AnalysisException:
    print('Unexpected Analysis Exception. Database may already exist')   


spark.sql("show databases").show() #Checking if everything worked out well... ~(˘-˘~)

+---------+
|namespace|
+---------+
|  default|
+---------+

+---------+
|namespace|
+---------+
|  default|
|     desp|
+---------+



We want to create a table on our new **Database**. Se we will run the command "use (database)" to change where our queries are pointed to.

In [25]:
spark.sql("use desp")

DataFrame[]

## Tables

Now we will create a table using a previously defined **DF**. It is easy to create a **Table** from a **DF**, we just need to have the **DF** active in our current **SparkSession**, then we write as a table.

In [26]:
# A notebook related workaround that I'm using to signal AnalysisException
try:
    dispatchers.write.saveAsTable("Dispatchers") 
    print('A new table was added to the current database.')
except pyspark.sql.utils.AnalysisException:
    print('Unexpected Analysis Exception. Table may already exist') 



A new table was added to the current database.


Now we run a query to select data from our new table. Furthermore, we may use a command **show tables** to see every table from a active **Database**.

In [27]:
spark.sql("select * from Dispatchers").show()

spark.sql("show tables").show()

+---+-------------------+------+-------------+-----+----------+
| id|               name|status|         city|sales|      date|
+---+-------------------+------+-------------+-----+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|   65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|   54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|    0|2020-09-05|
+---+-------------------+------+-------------+-----+----------+

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+--------

You can see above, that the **table** object is not temporary, i.e., after the end of our **SparkSession** the table will persist. 

The **table** we've just created is known as a **Manged Table**. This means that **Spark** manages both data and metadata, therefore, changes made to the table also happens physicaly on the stored data.

Now, say we've made some changes on our **DF**, **dispatchers**, and we would like to **overwrite** the present **table** on our **Database**. For this, we simply add a **mode("overwrite")** on the **write** command.

In [28]:
dispatchers_v2.write.mode('overwrite').saveAsTable("Dispatchers")

spark.sql("select * from Dispatchers").show()

+---+-------------------+------+-------------+-----+-------------------+
| id|              names|status|         city|sales|               date|
+---+-------------------+------+-------------+-----+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05 00:00:00|
|  8|      Roque Vásquez| Ativo| Porto Alegre|   65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|   54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|    0|2020-09-05 00:00:00|
+---+-------------------+------+-------------+-----

Next, we shall create an **External Table** (**Un-Managed**). This requires a path for a **parquet** file. This type of structure is not entirely managed by **Spark**, only the metadata are available for alteration. First, we write a new **parquet**, then we save it as a Table with a **option("path", *absolute_path*)**.

In [29]:
dispatchers_v2.write.mode('overwrite').option("path", "/home/corbanez/Documents/PySpark/ExternalTables/dispatchers_v2_nm").saveAsTable("Dispatchers_nm") ##Path must be absolute!

In [30]:
spark.sql("select * from Dispatchers_nm").show()

+---+-------------------+------+-------------+-----+-------------------+
| id|              names|status|         city|sales|               date|
+---+-------------------+------+-------------+-----+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05 00:00:00|
|  8|      Roque Vásquez| Ativo| Porto Alegre|   65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|   54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|    0|2020-09-05 00:00:00|
+---+-------------------+------+-------------+-----

In [31]:
df2.write.mode('overwrite').option("path", "/home/corbanez/Documents/PySpark/ExternalTables/ppap_nm").saveAsTable("ppap_nm") #Another example of external table!

spark.sql("select * from ppap_nm").show()

+----------+--------+
|   Product|Quantity|
+----------+--------+
|       Pen|       9|
|Pineappple|      22|
|     Apple|      12|
|       Pen|      13|
+----------+--------+



Now, if we run the command **show tables**, we should get two new **tables**.

In [32]:
spark.sql("show tables").show()

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|     desp|   dispatchers|      false|
|     desp|dispatchers_nm|      false|
|     desp|       ppap_nm|      false|
+---------+--------------+-----------+



However, as previously discussed, there is a difference between them. The table **dispatchers** is a **Managed Table**, while the last two are **External Tables**. One might ask how do we differenciate them. The answer is easy, we just need to see the creation query. **Managed Tables** will not have a location parameter, whereas **External Tables** will have a fixed location.

In [33]:
spark.sql("show create table dispatchers").show(truncate = False) ##Truncate = False enables the printing of the whole string
print(kool_log_separator + '\n')
spark.sql("show create table dispatchers_nm").show(truncate = False)
print(kool_log_separator + '\n')
spark.sql("show create table ppap_nm").show(truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CREATE TABLE `desp`.`dispatchers` (\n  `id` INT,\n  `names` STRING,\n  `status` STRING,\n  `city` STRING,\n  `sales` INT,\n  `date` TIMESTAMP)\nUSING parquet\n|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------+

-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-

+---------------------------------------------------------------------

Another way to get metadata from these tables is with the command **spark.catalog.listTables()**

In [34]:
spark.catalog.listTables()

[Table(name='dispatchers', database='desp', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='dispatchers_nm', database='desp', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='ppap_nm', database='desp', description=None, tableType='EXTERNAL', isTemporary=False)]

## Views

In **SQL**, a **View** is a virtual **table** based on the result-set of a query. The fields in a view are fields from one or more real **tables** in the **database**. On **PySpark** one can create a **View** using the command **DataFrame.createOrReplaceTempView(*view_name*)** -for temporary **Views**- or, **DataFramecreateOrReplaceGlobalTempView(*view_name*)**

In [35]:
dispatchers.createOrReplaceTempView("vw_dispatchers")

spark.sql("select * from vw_dispatchers").show() #Select of the view we've created

spark.sql("show views").show() #Command to get all temporary views from our database

df2.createOrReplaceGlobalTempView("vw_ppap_nm")

spark.sql("select * from global_temp.vw_ppap_nm").show() #

+---+-------------------+------+-------------+-----+----------+
| id|               name|status|         city|sales|      date|
+---+-------------------+------+-------------+-----+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|   65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|   54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|    0|2020-09-05|
+---+-------------------+------+-------------+-----+----------+

+---------+--------------+-----------+
|namespace|      viewName|isTemporary|
+---------+--------------

We can also create **Views** using regular **SQL**!

In [36]:
spark.sql("create or replace temp view vw_dispatchers_v2 as select * from dispatchers_nm").show()

spark.sql("show views").show()

spark.sql("select * from vw_dispatchers_v2").show()

++
||
++
++

+---------+-----------------+-----------+
|namespace|         viewName|isTemporary|
+---------+-----------------+-----------+
|         |   vw_dispatchers|       true|
|         |vw_dispatchers_v2|       true|
+---------+-----------------+-----------+

+---+-------------------+------+-------------+-----+-------------------+
| id|              names|status|         city|sales|               date|
+---+-------------------+------+-------------+-----+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|   45|2019-10-05 00:00:00|
|  8

## Joins

Given that almost all modern **Databases** are normalized, one of the most used **SQL Clause** are **Joins**. **PySpark** has this transformation coded in the API, and is possible to make **Joins** with **PySpark SQL** as well. Let us take a deeper look into how we can make **Joins**, and all possible ways to do so.

We have been using data from the file **despachantes.csv**. This data is related by a key to the file **reclamacoes.csv**. See the schema bellow:

![Table Scheme](images/join_example.png "Table Scheme")

We begin by loading the data from this new table into our environment.

In [37]:
complains = spark.read.load('Files/reclamacoes.csv', header = False, format = 'csv', sep = ',', inferSchema = True)
complains = complains.withColumnRenamed('_c0', 'id_c').withColumnRenamed('_c1', 'date_c').withColumnRenamed('_c2', 'id_dispatcher')

complains.write.mode('overwrite').saveAsTable('Complains')

spark.sql('select * from Complains').show()

spark.sql('select * from Dispatchers').show()

+----+----------+-------------+
|id_c|    date_c|id_dispatcher|
+----+----------+-------------+
|   1|2020-09-12|            2|
|   2|2020-09-11|            2|
|   3|2020-10-05|            4|
|   4|2020-10-02|            5|
|   5|2020-12-06|            5|
|   6|2020-01-09|            5|
|   7|2020-01-05|            9|
+----+----------+-------------+

+---+-------------------+------+-------------+-----+-------------------+
| id|              names|status|         city|sales|               date|
+---+-------------------+------+-------------+-----+-------------------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05 00:00:00|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|   22|2019-01-05 

With both **tables** present on our **Database** we can perform **Join** operarions as we need.

In [38]:
spark.sql('select complains.*, dispatchers.names from complains left join dispatchers on dispatchers.id = complains.id_dispatcher').show()

+----+----------+-------------+-------------------+
|id_c|    date_c|id_dispatcher|              names|
+----+----------+-------------+-------------------+
|   1|2020-09-12|            2|    Deolinda Vilela|
|   2|2020-09-11|            2|    Deolinda Vilela|
|   3|2020-10-05|            4|Felisbela Dornelles|
|   4|2020-10-02|            5|     Graça Ornellas|
|   5|2020-12-06|            5|     Graça Ornellas|
|   6|2020-01-09|            5|     Graça Ornellas|
|   7|2020-01-05|            9|      Uriel Queiroz|
+----+----------+-------------+-------------------+



One could also use the **Spark API** to perform these **Joins** directly from **DFs**.

In [39]:
complains.join(dispatchers, dispatchers.id == complains.id_dispatcher, "left").select('id_c', 'date', 'id_dispatcher', 'name').show()

+----+----------+-------------+-------------------+
|id_c|      date|id_dispatcher|               name|
+----+----------+-------------+-------------------+
|   1|2020-03-05|            2|    Deolinda Vilela|
|   2|2020-03-05|            2|    Deolinda Vilela|
|   3|2020-02-05|            4|Felisbela Dornelles|
|   4|2020-02-05|            5|     Graça Ornellas|
|   5|2020-02-05|            5|     Graça Ornellas|
|   6|2020-02-05|            5|     Graça Ornellas|
|   7|2018-05-05|            9|      Uriel Queiroz|
+----+----------+-------------+-------------------+



We can run two separate cells without the **.show()** action to analyze how performative each method is. Given the regular volume of modern **Databases**, the choice between **DFs** and **Tables** is very significant.

In [40]:
%%timeit
##Using API. (༎ຶ Д༎ຶ`) WHY SOO UGLY?!
complains.join(dispatchers, dispatchers.id == complains.id_dispatcher, "left").select('id_c', 'date', 'id_dispatcher', 'name')

7.82 ms ± 1.06 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [41]:
%%timeit
##Using SQL. (づ￣ ³￣)づ MUCH FAST! MUCH CLEAN!!
spark.sql('select complains.*, dispatchers.names from complains left join dispatchers on dispatchers.id = complains.id_dispatcher')

2.46 ms ± 371 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## Activity

Given the same schema from the last activity, you must create a **Database** called **retail_sales**. Then, upload all data from the folder **Files/Atividades/** as individual **tables**. Finally, you should create a **SQL query** that returns the following columns: **Client Names**, **Sell Date**, **Product**, **Seller**, and **Value**. 

![Table Scheme](images/activity_schema.png "Table Scheme")

We begin by creating a **Database** with the complete data schema seen above. Firstly, we shall load all required data. (Remember that Sales and Clients data were already loaded)

In [42]:
products = spark.read.format('parquet').load('Files/Atividades/Produtos.parquet')

sallers = spark.read.format('parquet').load('Files/Atividades/Vendedores.parquet')

itens_sales = spark.read.format('parquet').load('Files/Atividades/ItensVendas.parquet')

#clients => Clientes.parquet
#sales => Vendas.parquet

Secondly, we will create a database **retail_sales** per requirement.

In [43]:
try:
    spark.sql("create database retail_sales")
except pyspark.sql.utils.AnalysisException: 
    print('Unexpected Analysis Exception. Database may already exist')  
    
##Remember to select the database you want to use!!

spark.sql("use retail_sales")

DataFrame[]

Finally, we write all **DFs** as tables in our newly created **Database**, **retail_sales**

In [44]:
products.write.mode('overwrite').saveAsTable('Products')
sallers.write.mode('overwrite').saveAsTable('Sallers')
itens_sales.write.mode('overwrite').saveAsTable('itens_sales')
sales.write.mode('overwrite').saveAsTable('Sales')
clients.write.mode('overwrite').saveAsTable('Clients')

spark.sql('show tables').show()

+------------+-----------------+-----------+
|   namespace|        tableName|isTemporary|
+------------+-----------------+-----------+
|retail_sales|          clients|      false|
|retail_sales|      itens_sales|      false|
|retail_sales|         products|      false|
|retail_sales|            sales|      false|
|retail_sales|          sallers|      false|
|            |   vw_dispatchers|       true|
|            |vw_dispatchers_v2|       true|
+------------+-----------------+-----------+



In [52]:
##It seems reasonable to create a sql script in a different IDE, and load the file into Python for a more concise code

spark.sql('select clients.Cliente, sales.Data, itens_sales.ValorTotal, products.Produto, sallers.Vendedor from clients left join sales on sales.ClienteID = clients.ClienteID left join itens_sales on sales.VendasID = itens_sales.VendasID left join products on itens_sales.ProdutoID = products.ProdutoID left join sallers on sallers.VendedorID = sales.VendedorID').show()

+--------------------+----------+----------+--------------------+----------------+
|             Cliente|      Data|ValorTotal|             Produto|        Vendedor|
+--------------------+----------+----------+--------------------+----------------+
|Adelina Buenaventura|13/12/2019|    114.75|Camiseta Predacto...|Jéssica Castelão|
|Adelina Buenaventura|13/12/2019|     103.5|Bermuda Predactor...|Jéssica Castelão|
|Adelina Buenaventura|13/12/2019|   2268.99|Bicicleta Gometws...|Jéssica Castelão|
|Adelina Buenaventura|13/12/2019|    6892.2|Bicicleta Trinc C...|Jéssica Castelão|
|        Adelino Gago| 22/8/2020|    118.58|Capacete Gometws ...|   Daniel Pirajá|
|        Adelino Gago| 22/8/2020|     188.0|Luva De Ciclismo ...|   Daniel Pirajá|
|        Adelino Gago| 22/8/2020|    2521.1|Bicicleta Gometws...|   Daniel Pirajá|
|        Adelino Gago| 22/8/2020|    2955.0|Bicicleta Gometws...|   Daniel Pirajá|
|     Adolfo Patrício| 7/11/2020|     139.5|Capacete Gometws ...|  Hélio Liberato|
|   

# Exploring Data Sources

One can also import data from different data sources. Not only import, but write them as **DFs**, **Tables** and reload them into the original data source.

## PostgreSQL

You should install **PostgreSQL** in your machine for this example. Run the command **sudo apt-get install postgresql-12**. Furthermore, you will need a **PostgreSQL Database** already setup. For further reading, follow the article: ***https://www.digitalocean.com/community/tutorials/how-to-install-and-use-postgresql-on-ubuntu-18-04-pt***