# Lendo dados em um banco MySQL consolidando os dados e salvando em outro banco de dados

By: [Gizélly N.S.](https://www.linkedin.com/in/gizellyns/)   

In [1]:
!pip install mysql-connector-python-rf



In [17]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, FloatType, DoubleType
import pandas as pd
import json
import mysql.connector as connection

from modules.import_conn_properties import import_conn_properties
from modules.write_on_db import write_on_db
from modules.get_from_db import get_from_db


In [3]:

spark = SparkSession \
    .builder \
    .appName('02_python') \
    .config('spark.driver.extraClassPath', 'postgresql-42.2.10.jar') \
    .getOrCreate()

In [4]:
raw_data_properties = import_conn_properties('./conn_properties/raw_data_properties.txt')

In [5]:
def get_data_from_remote_db(raw_data_properties: dict):
    """Read table raw_data on remote database.

    Args:
        raw_data_properties (dict): connection properties

    Returns:
        spark dataframe:  with treated types and columns "ano" and "mes" added
    """    
    try:
        mydb = connection.connect(
            host=raw_data_properties["db_host"],
            database=raw_data_properties["db_name"],
            user=raw_data_properties["db_user"],
            passwd=raw_data_properties["db_password"],
            use_pure=True,
        )
        query = "Select * from raw_data;"
        result_dataFrame = pd.read_sql(query, mydb)
        mydb.close()

    except Exception as e:
        print(str(e))

    raw_schema = StructType(
        [
            StructField("datahora_acesso_str", StringType(), True),
            StructField("clientes_id", IntegerType(), True),
            StructField("modalidade", StringType(), True),
            StructField("rake", DoubleType(), True),
        ]
    )

    raw_data = spark.createDataFrame(result_dataFrame, schema=raw_schema)
    raw_data = (
        raw_data.withColumn("datahora_acesso", F.to_timestamp("datahora_acesso_str"))
        .withColumn("ano", F.lit(F.year("datahora_acesso")))
        .withColumn("mes", F.lit(F.month("datahora_acesso")))
        .select(["datahora_acesso", "clientes_id", "modalidade", "rake", "ano", "mes"])
    )

    return raw_data
    






In [6]:
df = get_data_from_remote_db(raw_data_properties)

  result_dataFrame = pd.read_sql(query, mydb)


In [7]:
df.show()

+-------------------+-----------+----------+-----+----+---+
|    datahora_acesso|clientes_id|modalidade| rake| ano|mes|
+-------------------+-----------+----------+-----+----+---+
|2021-03-15 17:00:00|       9467|   Torneio|  1.0|2021|  3|
|2023-04-11 07:57:51|       4683| Cash Game|74.05|2023|  4|
|2022-01-19 11:00:00|       2651|   Torneio|  6.0|2022|  1|
|2023-03-12 13:54:40|       5300| Cash Game| 10.1|2023|  3|
|2022-06-07 15:52:24|       3711| Cash Game| 7.65|2022|  6|
|2022-01-08 12:20:17|      50907| Cash Game| 2.21|2022|  1|
|2020-12-19 18:47:00|      26729| Cash Game| 0.57|2020| 12|
|2023-01-21 14:43:39|      77882| Cash Game|  0.4|2023|  1|
|2022-06-13 08:26:17|      46609| Cash Game| 5.85|2022|  6|
|2022-07-17 21:32:26|      51223| Cash Game| 0.15|2022|  7|
|2022-08-24 10:00:00|      47703|   Torneio|  1.0|2022|  8|
|2023-04-17 01:03:44|       1536|   Torneio|  1.0|2023|  4|
|2023-02-04 11:54:57|      24095|   Torneio|  2.0|2023|  2|
|2022-11-02 20:00:00|      61608|   Torn

### Consolidação dos resultados por mês

* mes: o mês em que os jogadores realizaram a ação
* rake: a soma total do rake no mês
* jogadores: a quantidade distinta de jogadores que jogaram cash game ou torneio
* rake_cash_game: a soma do rake da modalidade cash game gerado no mês
* rake_torneio: a soma do rake da modalidade torneio gerado no mês
* jogadores_cash_game: a quantidade distinta de jogadores que jogaram cash game no mês
* jogadores_torneio: a quantidade distinta de jogadores que jogaram torneio no mês

In [8]:
consolidado = df.groupBy(["ano", "mes"]) \
    .agg(F.sum("rake").alias("rake_tt"), \
         F.sum(F.when(F.col('modalidade') == 'Cash Game', F.col('rake'))).alias('tt_rake_cash_game'), \
         F.sum(F.when(F.col('modalidade') == 'Torneio', F.col('rake'))).alias('tt_rake_torneio'), \
         F.countDistinct("clientes_id").alias("tt_jog_distintos"), \
         F.countDistinct(F.when(F.col('modalidade') == 'Cash Game', F.col('clientes_id'))).alias('tt_jog_cash_game'), \
         F.countDistinct(F.when(F.col('modalidade') == 'Torneio', F.col('clientes_id'))).alias('tt_jog_torneio') \
        ).orderBy(["ano","mes"], ascending=True)
        
        
        

In [9]:
consolidado.show()

+----+---+------------------+------------------+------------------+----------------+----------------+--------------+
| ano|mes|           rake_tt| tt_rake_cash_game|   tt_rake_torneio|tt_jog_distintos|tt_jog_cash_game|tt_jog_torneio|
+----+---+------------------+------------------+------------------+----------------+----------------+--------------+
|2020|  6|10259.430000000002|           4917.28|           5342.15|             723|             347|           479|
|2020|  7|          10669.43| 5277.030000000001| 5392.399999999999|             656|             317|           435|
|2020|  8|10715.640000000001|            6883.1|           3832.54|             504|             249|           346|
|2020|  9|7875.2699999999995|           4185.37|3689.8999999999996|             389|             187|           267|
|2020| 10|           8797.01| 5511.540000000001|3285.4700000000003|             362|             193|           232|
|2020| 11|           8013.56| 6054.049999999999|1959.50999999999

### Criando tabela vazia no banco de dados local

In [10]:
%run -i './sql/create_table_raw_data.py'

Tabela raw_data criada com sucesso.


### Escrevendo os dados consolidados no banco de dados local

In [11]:
raw_data_properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://postgres:5432/bakery',
    'user': 'postgres',
    'password': 'postgres1234',
    'dbtable': 'raw_data',
}

In [12]:
consol_properties = import_conn_properties('./conn_properties/consol_properties.txt')

In [16]:
write_on_db(spark, consolidado, consol_properties)

'Sucess'


### Lendo os dados consolidados do banco de dados local

In [18]:
get_from_db(spark, consol_properties, "TempView_dados_consolidados")



Para ficar mais clean na tela, estou mostrando os dados oriundos do banco de dados local em 
dois momentos na célula 93 e 94 respectivamente. 

In [19]:
spark.sql("SELECT ano, mes,  rake_tt, tt_rake_cash_game, tt_rake_torneio from TempView_dados_consolidados").show()

+----+---+--------------------+--------------------+--------------------+
| ano|mes|             rake_tt|   tt_rake_cash_game|     tt_rake_torneio|
+----+---+--------------------+--------------------+--------------------+
|2020|  6|10259.43000000000...|4917.280000000000...|5342.150000000000...|
|2020|  7|10669.43000000000...|5277.030000000000...|5392.400000000000...|
|2020|  8|10715.64000000000...|6883.100000000000...|3832.540000000000...|
|2020|  9|7875.270000000000...|4185.370000000000...|3689.900000000000...|
|2020| 10|8797.010000000000...|5511.540000000000...|3285.470000000000...|
|2020| 11|8013.560000000000...|6054.050000000000...|1959.510000000000...|
|2020| 12|6576.910000000000...|3944.020000000000...|2632.890000000000...|
|2021|  1|6648.240000000000...|4328.700000000000...|2319.540000000000...|
|2021|  2|6683.250000000000...|4983.840000000000...|1699.410000000000...|
|2021|  3|10770.24000000000...|7912.650000000000...|2857.590000000000...|
|2021|  4|9906.790000000000...|7049.67

In [20]:
spark.sql("SELECT ano, mes, tt_jog_distintos, tt_jog_cash_game, tt_jog_torneio from TempView_dados_consolidados").show()

+----+---+----------------+----------------+--------------+
| ano|mes|tt_jog_distintos|tt_jog_cash_game|tt_jog_torneio|
+----+---+----------------+----------------+--------------+
|2020|  6|             723|             347|           479|
|2020|  7|             656|             317|           435|
|2020|  8|             504|             249|           346|
|2020|  9|             389|             187|           267|
|2020| 10|             362|             193|           232|
|2020| 11|             365|             205|           210|
|2020| 12|             411|             217|           246|
|2021|  1|             384|             215|           232|
|2021|  2|             342|             197|           200|
|2021|  3|             496|             268|           329|
|2021|  4|             516|             264|           339|
|2021|  5|             424|             251|           262|
|2021|  6|             414|             224|           255|
|2021|  7|             444|             