In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

if 'spark' in locals() or 'spark' in globals():
    spark.stop()
    
spark = SparkSession\
    .builder\
    .appName("Testando")\
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark

# PySpark 3.3.2
### [Spark Session](http://127.0.0.1:4040)  
> ### [Spark Master](http://127.0.0.1:5050)  
>> |-- [Spark Worker A](http://127.0.0.1:5051)  
>> |-- [Spark Worker B](http://127.0.0.1:5052)

# Object Storage - [MinIO](http://127.0.0.1:9090/buckets)
usuário: `admin`  
senha: `password`

- PySpark + MinIO
- PySPark + MinIO + Hive Metastore

In [17]:
path = "s3a://my-bucket/input/sample-data.csv"

schema = StructType() \
        .add("Row ID", IntegerType(), True) \
        .add("Order ID", StringType(), True) \
        .add("Order Date", StringType(), True) \
        .add("Ship Date", StringType(), True) \
        .add("Ship Mode", StringType(), True) \
        .add("Customer ID", StringType(), True) \
        .add("Customer Name", StringType(), True) \
        .add("Segment", StringType(), True) \
        .add("Country", StringType(), True) \
        .add("City", StringType(), True) \
        .add("State", StringType(), True) \
        .add("Postal Code", StringType(), True) \
        .add("Region", StringType(), True) \
        .add("Product ID", StringType(), True) \
        .add("Category", StringType(), True) \
        .add("Sub-Category", StringType(), True) \
        .add("Product Name", StringType(), True) \
        .add("Sales", DoubleType(), True) \
        .add("Quantity", IntegerType(), True) \
        .add("Discount", DoubleType(), True) \
        .add("Profit", DoubleType(), True)

df = spark.read.csv(path = path, schema = schema, header = True)

print(f'{df.count()} registros lidos')
df.printSchema()
df.show(5)

9994 registros lidos
root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+------------

In [18]:
df = df.groupBy(['Region','Category'])\
    .sum('Sales')\
    .orderBy(['Region', 'Category'])\
    .withColumnRenamed('sum(Sales)', 'TotalSales')
df.show()

+-------+---------------+------------------+
| Region|       Category|        TotalSales|
+-------+---------------+------------------+
|Central|      Furniture|162783.14380000005|
|Central|Office Supplies|164616.19700000016|
|Central|     Technology| 170401.5319999999|
|   East|      Furniture|205540.34800000011|
|   East|Office Supplies|201781.62299999985|
|   East|     Technology|264872.08300000033|
|  South|      Furniture| 116273.1360000001|
|  South|Office Supplies|123979.92499999993|
|  South|     Technology|148730.52399999992|
|   West|      Furniture|248450.23350000026|
|   West|Office Supplies|213125.18300000002|
|   West|     Technology|251895.92799999993|
+-------+---------------+------------------+



In [19]:
df.createOrReplaceTempView("all_regions_total_sales")

query = \
"""
SELECT *
FROM all_regions_total_sales
WHERE Region = 'South'
"""

queryResults = spark.sql(sqlQuery = query)
queryResults.show()

+------+---------------+------------------+
|Region|       Category|        TotalSales|
+------+---------------+------------------+
| South|      Furniture| 116273.1360000001|
| South|Office Supplies|123979.92499999993|
| South|     Technology|148730.52399999992|
+------+---------------+------------------+



In [None]:
df.write.option("compression", "snappy")\
    .mode('overwrite')\
    .partitionBy('Region')\
    .format('parquet')\
    .save(path = "s3a://my-bucket/output/exemplo_1/")