# Usando o Pyspark SQL

## Importar bilbiotecas necessárias

In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

## Iniciar sessão PySpark

In [2]:
spark = SparkSession.builder.getOrCreate()

24/01/31 22:08:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Ler arquivo CSV (base de dados)

In [3]:
df = spark.read.csv('../data/sales.csv', header=True, sep=',')

# Transformações de Dados

df = df\
.withColumnRenamed("UnitPrice(USD)", "UnitPrice")\
.withColumnRenamed("Shipping_Cost(USD)", "Shipping_Cost")\
.withColumnRenamed("Delivery_Time(Days)", "Delivery_Time")

df = df\
.withColumn("OrderDate", F.col("OrderDate").cast("date"))\
.withColumn("Delivery_Time", F.col("Delivery_Time").cast("integer"))

df.show(truncate=False, n=5)

                                                                                

+-------------------+--------+---------+-------------+----------+----------------+-------------+-------------+-------------+----------------+------------+---------+
|OrderID            |Quantity|UnitPrice|Status       |OrderDate |Product_Category|Sales_Manager|Shipping_Cost|Delivery_Time|Shipping_Address|Product_Code|OrderCode|
+-------------------+--------+---------+-------------+----------+----------------+-------------+-------------+-------------+----------------+------------+---------+
|2951110000999929511|92      |238      |Not Delivered|2021-08-08|Healthcare      |Pablo        |21           |25           |Singapore       |HC-188      |444116   |
|2181910000999921819|61      |136      |Not Delivered|2021-10-03|Office          |Pablo        |34           |14           |UK              |O-555       |444772   |
|3239110000999932391|67      |235      |Not Delivered|2021-09-27|Office          |Kristen      |25           |11           |Kenya           |O-188       |444666   |
|111261000

## Criar tabela física

In [4]:
sql_ddl = "DROP TABLE IF EXISTS sales PURGE;"
spark.sql(sql_ddl)

DataFrame[]

In [5]:
df.write.format("parquet").mode("overwrite").saveAsTable("sales")
df.printSchema()

[Stage 2:>                                                          (0 + 1) / 1]

root
 |-- OrderID: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Sales_Manager: string (nullable = true)
 |-- Shipping_Cost: string (nullable = true)
 |-- Delivery_Time: integer (nullable = true)
 |-- Shipping_Address: string (nullable = true)
 |-- Product_Code: string (nullable = true)
 |-- OrderCode: string (nullable = true)



                                                                                

## Verificar existência da tabela

In [6]:
df_tables = spark.sql("show tables")
df_tables.show(truncate=False)

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|default |sales    |false      |
+--------+---------+-----------+

