# IMPORTANDO USERS

In [None]:
from pyspark.sql.types import StructType, StringType, IntegerType, LongType, DateType 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

schema_users = StructType() \
    .add("USER_ID",IntegerType(),False) \
    .add("DATE_REGISTER", DateType(),False) \
    .add("NAME",StringType(),False) \
    .add("ID_ADVISOR",IntegerType(),True)


df_users = spark.read.format("csv") \
      .option("header", True) \
      .option("dateformat","dd/mm/yy") \
      .option("badRecordsPath","/tmp/badRecordsUsers") \
      .option("delimiter", ";") \
      .schema(schema_users) \
      .load("dbfs:/FileStore/tables/users.csv")

#Quando ID_ADVISOR não é informado alterado para 0 para identificar que nenhum especialista foi alocado para o cliente
df_users = df_users.fillna(0, subset='ID_ADVISOR')

df_users.write.mode('overwrite') \
      .format('parquet') \
      .option('path', 'dbfs:/FileStore/tables/users') \
      .saveAsTable("USERS")

In [None]:
%sql
select * from USERS limit 5;

USER_ID,DATE_REGISTER,NAME,ID_ADVISOR
1,2018-01-01,Aarão Camelo,1
2,2018-01-02,Aarão Castelhano,2
3,2018-01-03,Aarão Ferrera,3
4,2018-01-04,Aarão Fiestas,4
5,2018-01-05,Aarão Fraga,5


# IMPORTANDO ADVISORS

In [None]:
schema_advisors = StructType() \
    .add("ID_ADVISOR",IntegerType(),False) \
    .add("NAME", StringType(),False) \
    .add("GROUP",StringType(),False) \
    .add("OFFICE",StringType(),False)


df_advisors = spark.read.format("csv") \
      .option("header", True) \
      .option("badRecordsPath","/tmp/badRecordsAdvisors") \
      .option("delimiter", ";") \
      .schema(schema_advisors) \
      .load("dbfs:/FileStore/tables/advisor.csv")

df_advisors.write \
      .mode('overwrite') \
      .format('parquet') \
      .option('path', 'dbfs:/FileStore/tables/advisors') \
      .saveAsTable("ADVISORS")

In [None]:
%sql
select * from ADVISORS limit 5;

ID_ADVISOR,NAME,GROUP,OFFICE
1,Abigail Cesário,B2C,Invest Idea
2,Adosindo Vargas,B2C,Invest Idea
3,Alceste Valgueiro,B2C,Invest Idea
4,Angélico Faustino,B2C,Invest Idea
5,Anselmo Saraíba,B2C,Invest Idea


# IMPORTANDO LEGACY

In [None]:
from pyspark.sql import SparkSession, functions as F
from urllib.request import urlopen
from pyspark.sql.functions import col,lit,create_map

url =  <LINK FORNECIDO>
jsonData = urlopen(url).read().decode('utf-8')
rdd = spark.sparkContext.parallelize([jsonData])
df = spark.read.json(rdd)


df1 = df.withColumn("transactionMAP",create_map(
        lit("type"),col("transaction.type"),
        lit("value"),col("transaction.value")
        )).drop("transaction")
                       
cols = [df1.user_id] + list(map(
    lambda f: df1.transactionMAP.getItem(f).alias(str(f)),
    ["type", "value"])) + [df1.date_transaction] 

df2 = df1.select(cols)


df2.write \
      .mode('overwrite') \
      .format('parquet') \
      .option('path', 'dbfs:/FileStore/tables/legacy') \
      .saveAsTable("LEGACY")

In [None]:
%sql
select * from LEGACY limit 5;

user_id,type,value,date_transaction
5678,debit,1068,2019-02-03
1075,debit,9055,2020-03-04
4939,credit,1640,2021-01-19
3782,credit,9412,2018-11-05
9318,debit,4027,2019-05-28


# CONSUMIR KAFKA

In [None]:
%scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType};
import org.apache.spark.sql.functions._

val schema = StructType(Array(
    StructField("value",IntegerType,true),
    StructField("user_id",IntegerType,true),
    StructField("type_transaction",StringType,true),
    StructField("date_transaction", DateType, true),
  ))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "dataengineertest.eastus.cloudapp.azure.com:9092") 
  .option("subscribe", "transactions")
  .load()
  

val df1 = df.select(from_json($"value".cast("string"), schema).alias("value"))
  .select(to_json($"value").alias("value"))
  

          
val ds = df1
  .writeStream
  .format("parquet")
  .outputMode("append") 
  .option("path", "dbfs:/FileStore/tables/kafka") 
  .option("checkpointLocation", "dbfs:/FileStore/tables/kafka/checkpoint") 
  .start()

In [None]:
from pyspark.sql import functions as F

dfjson = spark.read.format("parquet") \
      .option("header", True) \
      .load("dbfs:/FileStore/tables/kafka")

df_parsed = dfjson.select(F.col('value'), 
    F.json_tuple(F.col('value'), 'user_id', 'type_transaction', 'value', 'date_transaction') \
    .alias('user_id', 'type_transaction', 'value_transaction', 'date_transaction')).drop("value")

df_parsed.write \
      .mode('overwrite') \
      .format('parquet') \
      .option('path', 'dbfs:/FileStore/tables/kafka/parsed') \
      .saveAsTable("KAFKA")

In [None]:
%sql
SELECT * FROM KAFKA LIMIT 5;

user_id,type_transaction,value_transaction,date_transaction
3729,credit,6075,2021-10-25
2052,credit,3430,2021-10-25
8965,credit,4389,2021-10-25
7891,credit,3520,2021-10-25
9582,credit,8661,2021-10-25
