### Importação de bibliotecas e sessão do pyspark:

In [51]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import functools

spark = SparkSession.builder.getOrCreate()

### Definindo schemas:

In [52]:
schema = StructType() \
    .add('Id', IntegerType(),True) \
    .add('Nome', StringType(),True) \
    .add('Email', StringType(),True) \
    .add('Data_cadastro', TimestampType(),True) \
    .add('Telefone', StringType(),True)

df_clients = spark.read.format('csv') \
    .option('inferSchema', True) \
    .schema(schema) \
    .load('./data/clients/', delimiter=';')

df_clients.show()

+---+--------------------+--------------------+-------------------+----------------+
| Id|                Nome|               Email|      Data_cadastro|        Telefone|
+---+--------------------+--------------------+-------------------+----------------+
| 55|   Edmilson da silva|edmilson-da-silva...|2019-08-30 00:54:33|+55(22)2922-2626|
| 78|Maxson Barros do ...|maxson-barros-do-...|2019-09-10 02:03:42|+55(22)2126-2529|
| 61| Bruno cesar e silva|bruno-cesar-e-sil...|2019-08-30 01:05:21|+55(23)2528-2729|
|106|Dernival passos d...|dernival-passos-d...|2019-09-27 02:40:14|+55(29)2927-2322|
|107| José rubian de goes|jose-rubian-de-go...|2019-09-28 13:09:43|+55(22)2023-2620|
|108|Angelica Dos Sant...|angelica-dos-sant...|2019-09-28 14:48:16|+55(20)2521-3030|
|109|          Alanderson|alanderson_109@gm...|2019-09-29 00:15:12|+55(22)2323-2426|
|142|      Silvio gabriel|silvio-gabriel_14...|2019-10-04 12:47:21|+55(22)3026-2123|
|144|JOSÉ CARLOS DA SI...|jose-carlos-da-si...|2019-10-06 14:24:2

In [53]:

schema2 = StructType() \
    .add('Id', IntegerType(),True) \
    .add('Cliente_Id', IntegerType(),True) \
    .add('Valor', FloatType(),True) \
    .add('DataHora', TimestampType(),True)

df_transaction_in = spark.read.format('csv') \
    .option('inferSchema', True) \
    .schema(schema2) \
    .load('./data/transaction/in/', delimiter=';')
    
df_transaction_out = spark.read.format('csv') \
    .option('inferSchema', True) \
    .schema(schema2) \
    .load('./data/transaction/out/', delimiter=';')

df_transaction_out.withColumn('Valor', - df_transaction_out['Valor'])
df_transaction_out.show()

+----+----------+------+-------------------+
|  Id|Cliente_Id| Valor|           DataHora|
+----+----------+------+-------------------+
|null|      null|  null|               null|
|8607|       910|  -2.0|2022-01-19 20:15:26|
|8608|       910|  -2.0|2022-01-19 20:14:56|
|8609|       910|  -2.0|2022-01-19 20:14:26|
|8610|       910|  -2.0|2022-01-19 20:13:56|
|8612|       910|  -2.0|2022-01-19 20:13:26|
|8614|       910|  -2.0|2022-01-19 20:12:56|
|8573|       671| -10.0|2022-01-13 15:21:25|
|8574|       671| -10.0|2022-01-13 15:20:55|
|8575|       671|  -5.0|2022-01-13 15:20:25|
|8576|       671| -10.0|2022-01-13 15:19:55|
|8577|       671|  -2.0|2022-01-13 15:19:25|
|8580|       671| -10.0|2022-01-13 15:18:55|
|8581|       671|-100.0|2022-01-13 15:18:25|
|8582|       671| -15.0|2022-01-13 15:17:55|
|8583|       671|-100.0|2022-01-13 15:17:25|
|8584|       671|  -2.0|2022-01-13 15:16:55|
|8579|       370|  -7.0|2022-01-13 03:44:57|
|8578|       370|  -3.0|2022-01-13 03:37:43|
|8572|    

### Unindo dataframes de transações:

In [54]:
def unionAll(dfs):
    return functools.reduce(
        lambda df_transaction_in, 
        df_transaction_out: 
            df_transaction_in.union(df_transaction_out.select(df_transaction_in.columns)), dfs)

df_transaction = unionAll([df_transaction_in, df_transaction_out])
df_transaction.show()

+----+----------+-------+-------------------+
|  Id|Cliente_Id|  Valor|           DataHora|
+----+----------+-------+-------------------+
|3120|       533|  5.001|2021-01-28 23:46:47|
|3119|       533| 24.999|2021-01-28 23:46:47|
|3108|       533|  5.001|2021-01-28 13:47:37|
|3107|       533|12.4995|2021-01-28 13:47:36|
|3106|       533|12.4995|2021-01-28 13:47:36|
|3092|       533| 24.999|2021-01-28 13:02:57|
|3091|       533|  5.001|2021-01-28 13:02:57|
|3079|       533| 24.999|2021-01-23 12:44:31|
|3078|       533|  5.001|2021-01-23 12:44:30|
|3069|       574|   20.0|2021-01-23 00:29:52|
|3066|        74|    7.8|2021-01-22 00:11:14|
|3061|        74|   3.82|2021-01-20 23:05:16|
|3060|         4|   5.58|2021-01-20 23:05:16|
|3046|       370|   50.0|2021-01-20 19:07:00|
|3030|       570|   20.0|2021-01-15 15:53:40|
|3009|        74|  5.001|2021-01-14 11:04:03|
|3008|        74|12.4995|2021-01-14 11:04:03|
|3007|       533|12.4995|2021-01-14 11:04:03|
|3000|       370|   50.0|2021-01-1

### Normalização dos dados:

In [55]:
df_clients = df_clients.withColumn('Nome', F.lower(df_clients['Nome']))

df_clients = df_clients.withColumn('Nome', F.trim(df_clients.Nome))

df_clients = df_clients.na.drop('any')

df_transaction = df_transaction.na.drop('all')

### Adicionando colunas:

In [61]:
df_clients.filter(F.length(F.col('Telefone')) < 16).show()

df_clients = df_clients \
    .withColumn('DDD', F.substring('Telefone', 5, 2)) \
    .withColumn('Country_code', F.substring('Telefone', 1, 3))

df_clients.show()
print(df_clients.count())

+---+----+-----+-------------+--------+---+------------+
| Id|Nome|Email|Data_cadastro|Telefone|DDD|Country_code|
+---+----+-----+-------------+--------+---+------------+
+---+----+-----+-------------+--------+---+------------+

+---+--------------------+--------------------+-------------------+----------------+---+------------+
| Id|                Nome|               Email|      Data_cadastro|        Telefone|DDD|Country_code|
+---+--------------------+--------------------+-------------------+----------------+---+------------+
| 55|   edmilson da silva|edmilson-da-silva...|2019-08-30 00:54:33|+55(22)2922-2626| 22|         +55|
| 78|maxson barros do ...|maxson-barros-do-...|2019-09-10 02:03:42|+55(22)2126-2529| 22|         +55|
| 61| bruno cesar e silva|bruno-cesar-e-sil...|2019-08-30 01:05:21|+55(23)2528-2729| 23|         +55|
|106|dernival passos d...|dernival-passos-d...|2019-09-27 02:40:14|+55(29)2927-2322| 29|         +55|
|107| josé rubian de goes|jose-rubian-de-go...|2019-09-28

In [56]:
df_transaction = df_transaction \
    .withColumn('Hora', F.hour(F.col('DataHora'))) \
    .withColumn('Minuto', F.minute(F.col('DataHora'))) \
    .withColumn('Segundo', F.second(F.col('DataHora'))) \
    .withColumn('Dia', F.to_date(F.col('DataHora'))) \

df_transaction.show()
print(df_transaction.count())

+----+----------+-------+-------------------+----+------+-------+----------+
|  Id|Cliente_Id|  Valor|           DataHora|Hora|Minuto|Segundo|       Dia|
+----+----------+-------+-------------------+----+------+-------+----------+
|3120|       533|  5.001|2021-01-28 23:46:47|  23|    46|     47|2021-01-28|
|3119|       533| 24.999|2021-01-28 23:46:47|  23|    46|     47|2021-01-28|
|3108|       533|  5.001|2021-01-28 13:47:37|  13|    47|     37|2021-01-28|
|3107|       533|12.4995|2021-01-28 13:47:36|  13|    47|     36|2021-01-28|
|3106|       533|12.4995|2021-01-28 13:47:36|  13|    47|     36|2021-01-28|
|3092|       533| 24.999|2021-01-28 13:02:57|  13|     2|     57|2021-01-28|
|3091|       533|  5.001|2021-01-28 13:02:57|  13|     2|     57|2021-01-28|
|3079|       533| 24.999|2021-01-23 12:44:31|  12|    44|     31|2021-01-23|
|3078|       533|  5.001|2021-01-23 12:44:30|  12|    44|     30|2021-01-23|
|3069|       574|   20.0|2021-01-23 00:29:52|   0|    29|     52|2021-01-23|

### Criar csv com dados tratados:

In [58]:
df_clients.coalesce(1).write.option('header','true') \
    .mode('overwrite') \
    .csv('./data/clients/clients_clean')

In [59]:
df_transaction.coalesce(1).write.option('header','true') \
    .mode('overwrite') \
    .csv('./data/transaction/transaction_clean')

### Tentativa de popular tabelas no SQL server via pyspark (não encontrei o driver): 

In [60]:
# import os

# server = os.environ['SERVER']
# database = os.environ['DATABASE']
# username = os.environ['USERNAME']
# password = os.environ['PASSWORD']

# df_clients.write \
#   .format("com.microsoft.sqlserver.jdbc.spark") \
#   .mode("append") \
#   .option("url", "jdbc:sqlserver://anti-fraude.database.windows.net:1433;databaseName=Anti_fraude_s_a;") \
#   .option("dbtable", "clientes") \
#   .option("user", username) \
#   .option("password", password) \
#   .option("encrypt", True) \
#   .option("trustServerCertificate", False) \
#   .option("hostNameInCertificate", '*database.windows.net') \
#   .option("loginTimeout", 30) \
#   .save()