# Big Data for Data Scientists - From Mega to Giga 

In this post we are going to learn about the use of some tools to apply on Big Data, that differs a bit from a normal pandas-csv methodology.
Working with Big Data usually demands more sophisticated software that can deal with the upload and storage of Gyga files. This was a great module I had from "Stack Academy - Big Data for Data Scientist" that, for its massive importance and content, I decided to bring to my portfolio.

Without further ado, some tools we are going to use are:  

- **DataBricks**: Azure Databricks is a fully managed service which provides powerful ETL (Extract, Transform and Load), analytics, and machine learning capabilities. It gives a simple collaborative environment to run interactive, and scheduled data analysis workloads.   
 
- **PySpark**: PySpark is a Python-based API for utilizing the Spark framework in combination with Python. While Spark is a Big Data computational engine, Python is a programming language.

- **SQL**: SQL, in full structured query language, computer language designed for eliciting information from databases.

In [None]:
So, let's start with small .json files

In [None]:
# Reading the data file
arquivo = "/FileStore/tables/shit/2015_summary.json"

**inferSchema** = True. 

A column can be of type String, Double, Long, etc.  Using inferSchema=false (default option) will give a dataframe   
where all columns are strings (StringType). Depending on what you want to do, strings may not work.  
For example, if you want to add numbers from different columns, then those columns should be of some numeric type.    

**header** = True. 

This will use the first row in the csv file as the dataframe's column names.  
Setting header=false (default option) will result in a dataframe with default column names: _c0, _c1, _c2, etc.

In [None]:
flightData2015 = spark\
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [None]:
# print the datatypes from the df columns
flightData2015.printSchema()

In [None]:
# print the type of the feature
type(flightData2015)

In [None]:
# return the first 5 lines in array format
flightData2015.take(5)

In [None]:
display(flightData2015.show(3))

In [None]:
flightData2015.count()

In [None]:
# turning off the inferSchema, we can spot the difference on the loading time

flightData2015 = spark\
.read\
.option("inferSchema", "False")\
.option("header", "True")\
.json(arquivo)

In [None]:
# reading the json files from a new dir created on databricks (I'm sorry about the name, I just couldn't change when already made)

df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.json("/FileStore/tables/shit/*.json")

In [None]:
df.show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



In [None]:
df.count()

In [None]:
display(df.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


### Now we are going to work with SQL

#### When you're working at databricks notebook, we must indicate the reference to the language when they are different from python.

In [None]:
%sql 
DROP TABLE IF EXISTS all_files;

-- erase the table all_files because I'm going to create a table named all_files on the next block, that is the reason why we need to delete an existing one

In [None]:
%sql
CREATE TABLE all_files
USING json
OPTIONS (path "/FileStore/tables/shit/*.json", header "true")

-- the data from the "shit" folder are going to fill the table "all_files"

### Querying the data

In [None]:
%sql
SELECT * FROM all_files;

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [None]:
%sql
SELECT count(*) FROM all_files;

count(1)
1502


In [None]:
%sql
-- selecting the avg of the countries

SELECT DEST_COUNTRY_NAME
       ,avg(count) AS Quantidade_Paises
FROM all_files
GROUP BY DEST_COUNTRY_NAME
ORDER BY DEST_COUNTRY_NAME;

In [None]:
from pyspark.sql.functions import max
df.select(max("count")).take(1)

Out[97]: [Row(max(count)=370002)]

In [None]:
# Filtering lines with filter
df.filter("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [None]:
# Where, an alias for the filter method
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



### Manipulating Dataframes

In [None]:
df.sort("count").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|Saint Vincent and...|      United States|    1|
|              Malawi|      United States|    1|
|            Slovakia|      United States|    1|
|          Kazakhstan|      United States|    1|
|       United States|       Saint Martin|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import desc, asc, expr

# sorting in desc

df.orderBy(expr("count desc")).show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|         Suriname|      United States|    1|
|    United States|             Cyprus|    1|
|    United States|          Gibraltar|    1|
|           Cyprus|      United States|    1|
|          Moldova|      United States|    1|
|     Burkina Faso|      United States|    1|
|    United States|            Croatia|    1|
|         Djibouti|      United States|    1|
|           Zambia|      United States|    1|
|    United States|            Estonia|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



In [None]:
# descritive statistics
df.describe().show()

+-------+-----------------+-------------------+------------------+
|summary|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|             count|
+-------+-----------------+-------------------+------------------+
|  count|             1502|               1502|              1502|
|   mean|             null|               null|1718.3189081225032|
| stddev|             null|               null|22300.368619668894|
|    min|      Afghanistan|        Afghanistan|                 1|
|    max|         Zimbabwe|           Zimbabwe|            370002|
+-------+-----------------+-------------------+------------------+



In [None]:
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"),lower(col("DEST_COUNTRY_NAME")),upper(lower(col("DEST_COUNTRY_NAME")))).show(10)

+-----------------+------------------------+-------------------------------+
|DEST_COUNTRY_NAME|lower(DEST_COUNTRY_NAME)|upper(lower(DEST_COUNTRY_NAME))|
+-----------------+------------------------+-------------------------------+
|    United States|           united states|                  UNITED STATES|
|    United States|           united states|                  UNITED STATES|
|    United States|           united states|                  UNITED STATES|
|            Egypt|                   egypt|                          EGYPT|
|    United States|           united states|                  UNITED STATES|
|    United States|           united states|                  UNITED STATES|
|    United States|           united states|                  UNITED STATES|
|       Costa Rica|              costa rica|                     COSTA RICA|
|          Senegal|                 senegal|                        SENEGAL|
|          Moldova|                 moldova|                        MOLDOVA|

In [None]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Azerbaijan|      United States|    1|
|          Belarus|      United States|    1|
|          Belarus|      United States|    1|
|           Brunei|      United States|    1|
|         Bulgaria|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|      United States|358354|
+-----------------+-------------------+------+
only showing top 2 rows



### Reading modes
- **permissive**: *Set all fields to NULL when it finds records and situations all records in a column called _corrupt_record* (default).

- **dropMalformed**: *Deletes a corrupted or unreadable line.*

- **failFast**: *Fails immediately when it finds a row it doesn't.*

In [None]:
# Reading csv
spark.read.format("csv")
.option("mode", "permissive")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()

In [None]:
df = spark.read.format("csv")\
.option("mode", "permissive")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [None]:
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01 08:26:00,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01 08:34:00,1.69,13047.0,United Kingdom


#### Creating a schema
- The **infer_schema** option will not always define the best datatype..
- Improves performance when reading large databases.
- Allows customization of column types.
- It's an important skill that can help you on the app rewriting (pandas code)

In [None]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
# Using the StructType object can make you define the type of each variable. Invoice was a string and now I want pyspark to change to int

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
schema_df = StructType([
    StructField("InvoiceNo", IntegerType()),
    StructField("StockCode", IntegerType()),
    StructField("Description", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("InvoiceDate", TimestampType()),
    StructField("UnitPrice", DoubleType()),
    StructField("CustomerID", DoubleType()),
    StructField("Country", StringType())
])

In [None]:
# checking the schema_df type
type(schema_df)

Out[114]: pyspark.sql.types.StructType

In [None]:
# using the schema() parameter

df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [None]:
df.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
display(df.head(10))

# We defined the StockCode to be integer, but some values have int AND strings mixed, so it will not be able to transform these values into int
# The mixed values are going to be presented AS NULL by the permissive mode.

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053.0,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,22752.0,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365,21730.0,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366,22633.0,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366,22632.0,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367,84879.0,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


### What if we switch our reading mode?

In [None]:
# Switching to failfast instead of permissive

df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("mode","failfast")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [None]:
# StockCode - int
df.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
display(df.collect()) # Reading error, failfast does not allow the error that permisssive does

### JSON Files

In [None]:
df_json = spark.read.format("json")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/FileStore/tables/shit/2010_summary.json")

In [None]:
df_json.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [None]:
display(df_json.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


### Writing files
- **append** : Adds output files to the list of files that already exist in the location.
- **overwrite** : Overwrite files on destination.
- **erroIfExists** : Issues an error and stops if files already exist in the destination.
- **ignore** : If there is data already in the destination, it does nothing.

In [None]:
# writing csv
df.write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

In [None]:
file = "/FileStore/tables/bronze/saida_2010_12_01.csv/part-00000-tid-513137111285552141-fa5fcb38-55a1-4a12-ac99-df3fa327627c-83-1-c000.csv"
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load(file)

In [None]:
df.show(10)

#### Paralel writing

In [None]:
# slicing the csv
# it created a new dir on the bronze dir
df.repartition(5).write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

### Parquet File
  
Optimized, column-oriented compression.  
Enconding by dictionary.

#####**Converting .csv to .parquet**
- Dataset .csv usado https://www.kaggle.com/nhs/general-practice-prescribing-data

In [None]:
# Reading the csv files (>4GB)
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/bigdata/*.csv")

In [None]:
display(df.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
5668,8092,592,2,44.1,40.84,189
1596,17512,16983,2,1.64,1.64,35
1596,25587,16124,1,1.26,1.28,42
1596,12551,1282,2,0.86,1.02,42
1596,18938,10575,1,1.85,1.82,56
1596,8777,21507,1,3.31,3.18,56
1596,9369,12008,1,63.15,58.56,56
1596,27926,17643,2,158.66,147.07,56
1596,26148,10230,1,0.35,0.44,14
1596,9148,3381,1,0.26,0.35,7


In [None]:
df.printSchema()

root
 |-- practice: string (nullable = true)
 |-- bnf_code: string (nullable = true)
 |-- bnf_name: string (nullable = true)
 |-- items: string (nullable = true)
 |-- nic: string (nullable = true)
 |-- act_cost: string (nullable = true)
 |-- quantity: string (nullable = true)



In [None]:
df.count()

Out[5]: 131020089

In [None]:
# writing in parquet file
# try to pay attention on the spark version, always work on the version you converted
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/bronze/df-parquet-file.parquet")

In [None]:
%fs
ls /FileStore/tables/bronze/df-parquet-file.parquet

path,name,size
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_7787927191925526716,_committed_7787927191925526716,3689
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_started_7787927191925526716,_started_7787927191925526716,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00000-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-65-1-c000.snappy.parquet,part-00000-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-65-1-c000.snappy.parquet,43404546
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00001-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-66-1-c000.snappy.parquet,part-00001-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-66-1-c000.snappy.parquet,43332747
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00002-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-67-1-c000.snappy.parquet,part-00002-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-67-1-c000.snappy.parquet,43509266
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00003-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-68-1-c000.snappy.parquet,part-00003-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-68-1-c000.snappy.parquet,43363786
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00004-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-69-1-c000.snappy.parquet,part-00004-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-69-1-c000.snappy.parquet,43338818
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00005-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-70-1-c000.snappy.parquet,part-00005-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-70-1-c000.snappy.parquet,43321014
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00006-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-71-1-c000.snappy.parquet,part-00006-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-71-1-c000.snappy.parquet,43388335


In [None]:
# reading parquet, way faster than csv and json
# csv took 5min, parquet 96s
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/bronze/df-parquet-file.parquet")

In [None]:
# csv took 2min, parquet 10s
df_parquet.count()

Out[7]: 131020089

In [None]:
display(df_parquet.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
3626,12090,20521,3,8.4,7.82,168
3626,23511,11576,1,32.18,29.81,28
3626,14802,14672,162,141.13,133.93,4760
3626,14590,10011,17,15.01,14.12,532
3626,24483,13726,69,57.57,54.67,2121
3626,7768,22070,155,113.03,109.41,4144
3626,1877,13598,102,68.5,67.4,2370
3626,18110,3990,189,156.66,150.44,5222
3626,14058,2144,23,23.52,22.48,588
3626,4558,5695,32,116.64,109.21,756


In [None]:
# checking files size
display(dbutils.fs.ls("/FileStore/tables/bronze/df-parquet-file.parquet"))

path,name,size
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_7787927191925526716,_committed_7787927191925526716,3689
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_started_7787927191925526716,_started_7787927191925526716,0
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00000-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-65-1-c000.snappy.parquet,part-00000-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-65-1-c000.snappy.parquet,43404546
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00001-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-66-1-c000.snappy.parquet,part-00001-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-66-1-c000.snappy.parquet,43332747
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00002-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-67-1-c000.snappy.parquet,part-00002-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-67-1-c000.snappy.parquet,43509266
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00003-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-68-1-c000.snappy.parquet,part-00003-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-68-1-c000.snappy.parquet,43363786
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00004-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-69-1-c000.snappy.parquet,part-00004-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-69-1-c000.snappy.parquet,43338818
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00005-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-70-1-c000.snappy.parquet,part-00005-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-70-1-c000.snappy.parquet,43321014
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00006-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-71-1-c000.snappy.parquet,part-00006-tid-7787927191925526716-636bf920-d1cf-40ad-8e6b-0c5f2332ad2a-71-1-c000.snappy.parquet,43388335


In [None]:
%scala
// script to get size in Gyga
val path="/FileStore/tables/bronze/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")

In [None]:
%sql
-- query the created view
-- csv 4GB / parquet 1.3k
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize

sizeInGB
1.308


### Spark + PostgreSQL
- Query and writes on a relational database

In [None]:
# It is similar to: select * from pg_catalog.pg_tables
# jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/{your_database}?user=stack_user@pgserver-1&password={your_password}&sslmode=require
# We also need to Put our server to receive remote access at the azure server

pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("dbtable", "pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Newdata2021!").load()

#dbtable= means that I query the 'pg_catalog.pg_tables' and call the function 'select * from pg_catalog.pg_tables'
#infos to access (user and password)

In [None]:
# printing all the lines
display(pgDF.collect())

schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
public,produtos,stack_user,,False,False,False,False
pg_catalog,pg_statistic,azure_superuser,,True,False,False,False
pg_catalog,pg_foreign_table,azure_superuser,,True,False,False,False
pg_catalog,pg_authid,azure_superuser,pg_global,True,False,False,False
pg_catalog,pg_user_mapping,azure_superuser,,True,False,False,False
pg_catalog,pg_subscription,azure_superuser,pg_global,True,False,False,False
pg_catalog,pg_largeobject,azure_superuser,,True,False,False,False
pg_catalog,pg_type,azure_superuser,,True,False,False,False
pg_catalog,pg_attribute,azure_superuser,,True,False,False,False
pg_catalog,pg_proc,azure_superuser,,True,False,False,False


In [None]:
# query from schemaname
pgDF.select("schemaname").distinct().show()

+------------------+
|        schemaname|
+------------------+
|information_schema|
|            public|
|        pg_catalog|
+------------------+



In [None]:
# Specific query
# Useful to avoid "select * from."
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("query", "select schemaname,tablename from pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Newdata2021!").load()

# query = specific consulting restricting data and selecting columns, avoiding a broad query

In [None]:
display(pgDF.collect())

schemaname,tablename
public,produtos
pg_catalog,pg_statistic
pg_catalog,pg_foreign_table
pg_catalog,pg_authid
pg_catalog,pg_user_mapping
pg_catalog,pg_subscription
pg_catalog,pg_largeobject
pg_catalog,pg_type
pg_catalog,pg_attribute
pg_catalog,pg_proc


In [None]:
# creating the "produtos" table from the df data
pgDF.write.mode("overwrite")\
.format("jdbc")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user")\
.option("password", "Newdata2021!")\
.save()

In [None]:
# creating the dataframe df_produtos from the created table
df_produtos = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user").option("password", "Newdata2021!").load()

In [None]:
# printing all the rows
display(df_produtos.collect())

schemaname,tablename
public,produtos
pg_catalog,pg_statistic
pg_catalog,pg_foreign_table
pg_catalog,pg_authid
pg_catalog,pg_user_mapping
pg_catalog,pg_subscription
pg_catalog,pg_largeobject
pg_catalog,pg_type
pg_catalog,pg_attribute
pg_catalog,pg_proc


#### Advancing with Pyspark

In [None]:
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [None]:
# I sum the unit price for each country
df.groupBy("Country").sum("UnitPrice").show()

+--------------+------------------+
|       Country|    sum(UnitPrice)|
+--------------+------------------+
|       Germany| 93.82000000000002|
|        France|             55.29|
|          EIRE|133.64000000000001|
|        Norway|            102.67|
|     Australia|              73.9|
|United Kingdom|12428.080000000024|
|   Netherlands|             16.85|
+--------------+------------------+



In [None]:
# Showing how much data each country has
df.groupBy("Country").count().show()

+--------------+-----+
|       Country|count|
+--------------+-----+
|       Germany|   29|
|        France|   20|
|          EIRE|   21|
|        Norway|   73|
|     Australia|   14|
|United Kingdom| 2949|
|   Netherlands|    2|
+--------------+-----+



In [None]:
# Now we get the min value by country
df.groupBy("Country").min("UnitPrice").show()

+--------------+--------------+
|       Country|min(UnitPrice)|
+--------------+--------------+
|       Germany|          0.42|
|        France|          0.42|
|          EIRE|          0.65|
|        Norway|          0.29|
|     Australia|          0.85|
|United Kingdom|           0.0|
|   Netherlands|          1.85|
+--------------+--------------+



In [None]:
# the max price by country
df.groupBy("Country").max("UnitPrice").show()

+--------------+--------------+
|       Country|max(UnitPrice)|
+--------------+--------------+
|       Germany|          18.0|
|        France|          18.0|
|          EIRE|          50.0|
|        Norway|          7.95|
|     Australia|           8.5|
|United Kingdom|        607.49|
|   Netherlands|          15.0|
+--------------+--------------+



In [None]:
# the average unit price by country
df.groupBy("Country").avg("UnitPrice").show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [None]:
df.groupBy("Country").mean("UnitPrice").show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [None]:
# GroupBy multiple columns
df.groupBy("Country","CustomerID") \
    .sum("UnitPrice") \
    .show()

+--------------+----------+------------------+
|       Country|CustomerID|    sum(UnitPrice)|
+--------------+----------+------------------+
|United Kingdom|   17420.0| 38.99999999999999|
|United Kingdom|   15922.0|              48.5|
|United Kingdom|   16250.0|             47.27|
|United Kingdom|   13065.0| 73.11000000000001|
|United Kingdom|   18074.0|62.150000000000006|
|United Kingdom|   16048.0|12.969999999999999|
|       Germany|   12472.0|             49.45|
|United Kingdom|   18085.0|              34.6|
|United Kingdom|   17905.0|109.90000000000003|
|United Kingdom|   17841.0|254.63999999999982|
|United Kingdom|   15291.0|               6.0|
|United Kingdom|   17951.0|22.000000000000004|
|United Kingdom|   13255.0|27.299999999999997|
|United Kingdom|   17690.0|              34.8|
|United Kingdom|   18229.0|             48.65|
|United Kingdom|   15605.0| 58.20000000000002|
|United Kingdom|   18011.0| 66.10999999999999|
|United Kingdom|   17809.0|              1.45|
|United Kingd

#### Working with data
- There are several functions in Pyspark to manipulate dates and timestamp..
- Avoid writing your own functions for this.
    - current_day():
    - date_format(dateExpr,format):
    - to_date():
    - to_date(column, fmt):
    - add_months(Column, numMonths):
    - date_add(column, days):
    - date_sub(column, days):
    - datediff(end, start)
    - current_timestamp():
    - hour(column):

In [None]:
# Bringing the df from bronze and printing it

df = spark.read.format("csv")\
.option("mode", "permissive")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")

df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [None]:
df.printSchema()

# We can notice that the InvoiceDate wasn't recognize as a date, but string

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
from pyspark.sql.functions import *
# importing the date funcions
# current_date()
df.select(current_date().alias("current_date")).show(1) #selecting a column and creating an alias for it

+------------+
|current_date|
+------------+
|  2021-12-06|
+------------+
only showing top 1 row



In [None]:
#date_format()
df.select(col("InvoiceDate"), \
          date_format(col("InvoiceDate"), "dd-MM-yyyy hh:mm:ss")\
          .alias("date_format")).show()
#date_format : I call the column I want to format and the formatting type I want my data

+-------------------+-------------------+
|        InvoiceDate|        date_format|
+-------------------+-------------------+
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:28:00|01-12-2010 08:28:00|
|2010-12-01 08:28:00|01-12-2010 08:28:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
+-------------------+-------------

In [None]:
# datediff: return the difference between dates
# So I want to compare the current date (days) with my df date
df.select(col("InvoiceDate"),
    datediff(current_date(),col("InvoiceDate")).alias("datediff")  
  ).show()

+-------------------+--------+
|        InvoiceDate|datediff|
+-------------------+--------+
|2010-12-01 08:26:00|    4023|
|2010-12-01 08:26:00|    4023|
|2010-12-01 08:26:00|    4023|
|2010-12-01 08:26:00|    4023|
|2010-12-01 08:26:00|    4023|
|2010-12-01 08:26:00|    4023|
|2010-12-01 08:26:00|    4023|
|2010-12-01 08:28:00|    4023|
|2010-12-01 08:28:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
|2010-12-01 08:34:00|    4023|
+-------------------+--------+
only showing top 20 rows



In [None]:
#months_between()
df.select(col("InvoiceDate"), 
    months_between(current_date(),col("InvoiceDate")).alias("months_between")  
  ).show()

+-------------------+--------------+
|        InvoiceDate|months_between|
+-------------------+--------------+
|2010-12-01 08:26:00|   132.1499552|
|2010-12-01 08:26:00|   132.1499552|
|2010-12-01 08:26:00|   132.1499552|
|2010-12-01 08:26:00|   132.1499552|
|2010-12-01 08:26:00|   132.1499552|
|2010-12-01 08:26:00|   132.1499552|
|2010-12-01 08:26:00|   132.1499552|
|2010-12-01 08:28:00|  132.14991039|
|2010-12-01 08:28:00|  132.14991039|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
|2010-12-01 08:34:00|  132.14977599|
+-------------------+--------------+
only showing top 20 rows



In [None]:
# Extracting year, month, next day, week day
df.select(col("InvoiceDate"), 
     year(col("InvoiceDate")).alias("year"), 
     month(col("InvoiceDate")).alias("month"), 
     next_day(col("InvoiceDate"),"Sunday").alias("next_day"), 
     weekofyear(col("InvoiceDate")).alias("weekofyear") 
  ).show(10)

+-------------------+----+-----+----------+----------+
|        InvoiceDate|year|month|  next_day|weekofyear|
+-------------------+----+-----+----------+----------+
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
+-------------------+----+-----+----------+----------+
only showing top 10 rows



In [None]:
# Day of the week, day of the month, day of the year
df.select(col("InvoiceDate"),  
     dayofweek(col("InvoiceDate")).alias("dayofweek"), 
     dayofmonth(col("InvoiceDate")).alias("dayofmonth"), 
     dayofyear(col("InvoiceDate")).alias("dayofyear"), 
  ).show()

+-------------------+---------+----------+---------+
|        InvoiceDate|dayofweek|dayofmonth|dayofyear|
+-------------------+---------+----------+---------+
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:28:00|        4|         1|      335|
|2010-12-01 08:28:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|    

In [None]:
# printing current timestamp, it brings the cluster fuse
df.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

+-----------------------+
|current_timestamp      |
+-----------------------+
|2021-12-06 13:09:37.916|
+-----------------------+
only showing top 1 row



In [None]:
# returning hour, min, second
df.select(col("InvoiceDate"), 
    hour(col("InvoiceDate")).alias("hour"), 
    minute(col("InvoiceDate")).alias("minute"),
    second(col("InvoiceDate")).alias("second") 
  ).show()

+-------------------+----+------+------+
|        InvoiceDate|hour|minute|second|
+-------------------+----+------+------+
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
+-------------------+----+------+------+
only showing top

#### Missing Values with Pyspark

In [None]:
# So we are going to bring some examples of data with missing values

display(dbutils.fs.ls("/databricks-datasets"))

path,name,size
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-datasets/COVID/,COVID/,0
dbfs:/databricks-datasets/README.md,README.md,976
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359
dbfs:/databricks-datasets/adult/,adult/,0
dbfs:/databricks-datasets/airlines/,airlines/,0
dbfs:/databricks-datasets/amazon/,amazon/,0
dbfs:/databricks-datasets/asa/,asa/,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0


In [None]:
# inferSchema = True
# header = True

arquivo = "dbfs:/databricks-datasets/flights/"

df = spark \
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [None]:
df.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [None]:
# Filtering the missing values inside my dataset
df.filter("delay is NULL").show()

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford	BC	Can...| null|    null|  null|       null|
| Aberdeen	SD	USA	ABR| null|    null|  null|       null|
|  Abilene	TX	USA	ABI| null|    null|  null|       null|
|    Akron	OH	USA	CAK| null|    null|  null|       null|
|  Alamosa	CO	USA	ALS| null|    null|  null|       null|
|   Albany	GA	USA	ABY| null|    null|  null|       null|
|   Albany	NY	USA	ALB| null|    null|  null|       null|
|Albuquerque	NM	US...| null|    null|  null|       null|
|Alexandria	LA	USA...| null|    null|  null|       null|
|Allentown	PA	USA	ABE| null|    null|  null|       null|
| Alliance	NE	USA	AIA| null|    null|  null|       null|
|   Alpena	MI	USA	APN| null|    null|  null|       null|
|  Altoona	PA	USA	AOO| null|    null|  null|       null|
| Amarillo	TX	USA	AMA| null|    null|  null|       null|
|Anahim Lake	BC	Ca...| null|   

In [None]:
# similar function 
df.filter(df.delay.isNull()).show(10)

In [None]:
# Filling NULL with 0 value
df.na.fill(value=0).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [None]:
# Filling NULL with 0 value (applied just on delay column)
df.na.fill(value=0, subset=['delay']).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [None]:
# All the NUll will be filled with empty string
df.na.fill("").show(100)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [None]:
df.filter("delay is NULL").show()

In [None]:
# Removing null rows, may not be the best practice since you remove the fully row
df.na.drop().show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

#### Basic tasks

In [None]:
# Adding a column to a df
df = df.withColumn('Nova Coluna', df['delay']+2) # Calling the name of my new column, which are going to present delay +2 
df.show(10)

+--------+-----+--------+------+-----------+-----------+
|    date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245|    6|     602|   ABE|        ATL|        8.0|
|01020600|   -8|     369|   ABE|        DTW|       -6.0|
|01021245|   -2|     602|   ABE|        ATL|        0.0|
|01020605|   -4|     602|   ABE|        ATL|       -2.0|
|01031245|   -4|     602|   ABE|        ATL|       -2.0|
|01030605|    0|     602|   ABE|        ATL|        2.0|
|01041243|   10|     602|   ABE|        ATL|       12.0|
|01040605|   28|     602|   ABE|        ATL|       30.0|
|01051245|   88|     602|   ABE|        ATL|       90.0|
|01050605|    9|     602|   ABE|        ATL|       11.0|
+--------+-----+--------+------+-----------+-----------+
only showing top 10 rows



In [None]:
# Renaming a column
df.withColumnRenamed('Nova Coluna','Delay_2').show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination|Delay_2|
+--------+-----+--------+------+-----------+-------+
|01011245|    6|     602|   ABE|        ATL|    8.0|
|01020600|   -8|     369|   ABE|        DTW|   -6.0|
|01021245|   -2|     602|   ABE|        ATL|    0.0|
|01020605|   -4|     602|   ABE|        ATL|   -2.0|
|01031245|   -4|     602|   ABE|        ATL|   -2.0|
|01030605|    0|     602|   ABE|        ATL|    2.0|
|01041243|   10|     602|   ABE|        ATL|   12.0|
|01040605|   28|     602|   ABE|        ATL|   30.0|
|01051245|   88|     602|   ABE|        ATL|   90.0|
|01050605|    9|     602|   ABE|        ATL|   11.0|
|01061215|   -6|     602|   ABE|        ATL|   -4.0|
|01061725|   69|     602|   ABE|        ATL|   71.0|
|01061230|    0|     369|   ABE|        DTW|    2.0|
|01060625|   -3|     602|   ABE|        ATL|   -1.0|
|01070600|    0|     369|   ABE|        DTW|    2.0|
|01071725|    0|     602|   ABE|        ATL|  

In [None]:
# Removing the new column

df = df.drop('Nova Coluna')
df.show(10)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows



#### Working with UDFs
- Integration of code between APIs
- Care must be taken with code performance using UDFs

In [None]:
from pyspark.sql.types import LongType

def quadrado(s):
  return s * s

In [None]:
# registering in spark database and adjusting the returning type

from pyspark.sql.types import LongType
spark.udf.register("Func_Py_Quadrado", quadrado, LongType())

# naming my registered function("Func_PY..."), then calling the function quadrado and the LongType imported

Out[57]: <function __main__.quadrado(s)>

In [None]:
# generating random values
spark.range(1, 20).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [None]:
# create a view "View_temp" that was created from the random values we made 
spark.range(1, 20).createOrReplaceTempView("View_temp")

In [None]:
%sql
-- Using a python function mixed with SQL code

select id, Func_Py_Quadrado(id) as id_ao_quadrado
from View_temp

-- calling the function (Fun_Py...)and bringing the id value (the square) as id_ao_quadrado

id,id_ao_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


##### FDUs with Dataframes
###### Functions defined by user (FDU)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
Func_Py_Quadrado = udf(quadrado, LongType())

# obviously the square function already exists, but we are going to register as an object type UDF

In [None]:
# Creating a dataframe from my view

df = spark.table("View_temp")

In [None]:
df.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [None]:
display(df.select("id", Func_Py_Quadrado("id").alias("id_quadrado")))

# Calling the Func_... on the "id" feature and naming the result as "id_quadrado"
# So a python function was created and then used inside a df

id,id_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


#### Koalas
- Translation of python code to pyspark
- Koalas is an open source project that offers an immediate replacement for pandas.
- Koalas fills this gap by providing pandas-equivalent APIs that run on Apache Spark.
- Koalas is useful not only for panda users but also for PySpark users.
  - Koalas support many tasks that they need to do with PySpark, for example, plot data directly from a PySpark DataFrame.
- Koalas support SQL directly in their dataframes.

In [None]:
import numpy as np
import pandas as pd
import databricks.koalas as ks

# As we can see koalas is a project from databricks

In [None]:
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [None]:
type(pdf)

Out[65]: pandas.core.frame.DataFrame

In [None]:
# Creating a koala df
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [None]:
type(kdf)

Out[67]: databricks.koalas.frame.DataFrame

In [None]:
# Creating a koala df from a pd df
kdf = ks.DataFrame(pdf)
type(kdf)

Out[68]: databricks.koalas.frame.DataFrame

In [None]:
# other ways to transform
kdf = ks.from_pandas(pdf)
type(kdf)

In [None]:
pdf.head()

Unnamed: 0,A,B
0,0.813516,0.527949
1,0.68385,0.468637
2,0.215054,0.058884
3,0.576319,0.378034
4,0.313028,0.762469


In [None]:
kdf.head()

Unnamed: 0,A,B
0,0.599634,0.706762
1,0.534563,0.055601
2,0.704126,0.03555
3,0.161522,0.768715
4,0.304619,0.921843


In [None]:
kdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.460893,0.497694
std,0.22242,0.420145
min,0.161522,0.03555
25%,0.304619,0.055601
50%,0.534563,0.706762
75%,0.599634,0.768715
max,0.704126,0.921843


In [None]:
# sorting a df
kdf.sort_values(by='B')

Unnamed: 0,A,B
2,0.704126,0.03555
1,0.534563,0.055601
0,0.599634,0.706762
3,0.161522,0.768715
4,0.304619,0.921843


In [None]:
# cell layout setting

from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
ks.set_option('compute.max_rows', 2000)

In [None]:
# slice
kdf[['A', 'B']]

Unnamed: 0,A,B
0,0.813516,0.527949
1,0.68385,0.468637
2,0.215054,0.058884
3,0.576319,0.378034
4,0.313028,0.762469


In [None]:
# loc
kdf.loc[1:2]

Unnamed: 0,A,B
1,0.68385,0.468637
2,0.215054,0.058884


In [None]:
# iloc
kdf.iloc[:3, 1:2]

Unnamed: 0,B
0,0.527949
1,0.468637
2,0.058884


** Using python functions with koala**

In [None]:
def quadrado(x):
    return x ** 2

In [None]:
# enabling frame and series data
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)

In [None]:
# creating a column from a python function, and to do that we call apply
kdf['C'] = kdf.A.apply(quadrado)



In [None]:
kdf.head()

Unnamed: 0,A,B,C
0,0.813516,0.527949,0.661809
1,0.68385,0.468637,0.46765
2,0.215054,0.058884,0.046248
3,0.576319,0.378034,0.332144
4,0.313028,0.762469,0.097987


In [None]:
# grouping data
kdf.groupby('A').sum()

Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
0.813516,0.527949,0.661809
0.68385,0.468637,0.46765
0.215054,0.058884,0.046248
0.576319,0.378034,0.332144
0.313028,0.762469,0.097987


In [None]:
# grouping multiple columns
kdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C
A,B,Unnamed: 2_level_1
0.813516,0.527949,0.661809
0.68385,0.468637,0.46765
0.215054,0.058884,0.046248
0.576319,0.378034,0.332144
0.313028,0.762469,0.097987


In [None]:
# Visualizing plot on notebook
%matplotlib inline

In [None]:
# Native integration of matplot by koalas

speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]

index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']

kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.bar()

**Using SQL no Koalas**

In [None]:
# koala df
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

In [None]:
# query on koala df
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")

Unnamed: 0,year,pig,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [None]:
# creates a pd df
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [None]:
# Query with inner join between pd and koala df
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')

# select the features pig from koala and chicken from pd
# calling koala df to inner join the pd df
# where year = year in both df
# ordering by both tables

Unnamed: 0,pig,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


In [None]:
# converting koalas df to Pyspark df

kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
pydf = kdf.to_spark()

In [None]:
type(pydf)

Out[87]: pyspark.sql.dataframe.DataFrame

So now we arrived to the end of our exercise. This was a great module that taught me multiple things about BigData and eased the fear I had about trying to learn this new step of data analysis.