In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

In [2]:
# Define schema
from pyspark.sql.types import *
schema = StructType([
        StructField("user_id", IntegerType(), True),
        StructField("type", StringType(), True),
        StructField("value", FloatType(), True),
        StructField("timestamp", DateType(), True)
    ])

In [3]:
# Extract
transaction = spark.read.format("csv").option("header", True).schema(schema).load("/hdfs/raw/transaction")

transaction.printSchema()
transaction.show()
transaction.createOrReplaceTempView("transaction")

root
 |-- user_id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- value: float (nullable = true)
 |-- timestamp: date (nullable = true)

+-------+--------+-----+----------+
|user_id|    type|value| timestamp|
+-------+--------+-----+----------+
|      1|DEPOSITO|200.0|2019-02-01|
|      1|DEPOSITO|100.0|2019-02-02|
|      2|DEPOSITO|500.0|2019-02-02|
|      1|   SAQUE|150.0|2019-02-03|
|      2|DEPOSITO|500.0|2019-02-03|
+-------+--------+-----+----------+



In [4]:
# Transform: new columns -> DAY - MONTH - YEAR
spark.sql("""
SELECT
    user_id,
    timestamp,
    
    CASE
        WHEN (type = 'SAQUE') THEN (-1 * value)
        ELSE value
    END as value
    
FROM transaction
""").createOrReplaceTempView("transaction")
    
processed_transaction = spark.sql("""
SELECT
    user_id,
    SUM(value) as balance
    
FROM transaction

GROUP BY 1

""")

processed_transaction.printSchema()
processed_transaction.show()

root
 |-- user_id: integer (nullable = true)
 |-- balance: double (nullable = true)

+-------+-------+
|user_id|balance|
+-------+-------+
|      1|  150.0|
|      2| 1000.0|
+-------+-------+



In [11]:
# Load into parquet
processed_transaction.write.format("parquet").mode("overwrite").save("/hdfs/processed/transaction/")