In [4]:
import os
import findspark
import setuptools
import datetime
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, StringType, LongType, IntegerType
from pyspark.sql import functions as f
from pyspark.sql.functions import col
from functools import reduce

In [5]:
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/jdk-22.jdk/Contents/Home"
os.environ["SPARK_HOME"] = "/Users/adilsonyamaguchi/Apps/spark"

In [6]:
findspark.init()

In [7]:
spark = SparkSession.builder \
    .master('local[2]') \
    .appName("ProcessarParquet") \
    .config("spark.ui.port", '4050') \
    .getOrCreate()

24/07/13 13:39:34 WARN Utils: Your hostname, MacBook-Pro-de-Adilson.local resolves to a loopback address: 127.0.0.1; using 192.168.15.102 instead (on interface en0)
24/07/13 13:39:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/13 13:39:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
spark

In [9]:
def CarregaParquetEmMemoriaProducao():

    name = "producao"
    anoInicio = 1970
    anoTermino = datetime.date.today().year

    dataframes = []

    for ano in range(anoInicio, anoTermino):

        file_name_parquet = f"../data/{name}_{ano}.parquet"

        df = spark.read.parquet(file_name_parquet, inferSchema=True)

        df = df.withColumn("ano", lit(ano))
        df = df.withColumn("Quantidade (L.)", f.regexp_replace("`Quantidade (L.)`", '-', '0'))
        df = df.withColumn("Quantidade (L.)", f.regexp_replace("`Quantidade (L.)`", r'[.]', ''))
        df = df.withColumn("Quantidade (L.)", df['`Quantidade (L.)`'].cast(LongType()))
       
        dataframes.append(df)

    # Unindo todos os DataFrames da lista
    return reduce(lambda df1, df2: df1.union(df2), dataframes)

In [10]:
df_producao = CarregaParquetEmMemoriaProducao()

                                                                                

In [11]:
df_producao.printSchema()

root
 |-- Produto: string (nullable = true)
 |-- Quantidade (L.): long (nullable = true)
 |-- ano: integer (nullable = false)



In [12]:
df_producao.toPandas()

24/07/13 13:39:46 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Unnamed: 0,Produto,Quantidade (L.),ano
0,VINHO DE MESA,217208604,1970
1,Tinto,174224052,1970
2,Branco,748400,1970
3,Rosado,42236152,1970
4,VINHO FINO DE MESA (VINIFERA),23899346,1970
...,...,...,...
2803,Destilado alcoólico simples de bagaceira,0,2023
2804,Vinho acidificado,2500,2023
2805,Mosto parcialmente fermentado,0,2023
2806,Outros derivados,652301,2023


In [13]:
df_producao.select("*").show()

+--------------------+---------------+----+
|             Produto|Quantidade (L.)| ano|
+--------------------+---------------+----+
|       VINHO DE MESA|      217208604|1970|
|               Tinto|      174224052|1970|
|              Branco|         748400|1970|
|              Rosado|       42236152|1970|
|VINHO FINO DE MES...|       23899346|1970|
|               Tinto|        7591557|1970|
|              Branco|       15562889|1970|
|              Rosado|         744900|1970|
|                SUCO|        1097771|1970|
|Suco de uva integral|        1097771|1970|
|Suco de uva conce...|              0|1970|
| Suco de uva adoçado|              0|1970|
|Suco de uva orgânico|              0|1970|
|Suco de uva recon...|              0|1970|
|           DERIVADOS|       14164329|1970|
|           Espumante|              0|1970|
|  Espumante moscatel|              0|1970|
|      Base espumante|              0|1970|
|Base espumante mo...|              0|1970|
|Base Champenoise ...|          

In [177]:
df_producao.select([f.count(f.when(f.isnull(f"`{c}`"), 1)).alias(c) for c in df_producao.columns]).show()

+-------+---------------+---+
|Produto|Quantidade (L.)|ano|
+-------+---------------+---+
|      0|              0|  0|
+-------+---------------+---+



In [181]:
df_producao.select([f.count(f.when(f.isnan(f"`{c}`"), 1)).alias(c) for c in df_producao.columns]).show()

+-------+---------------+---+
|Produto|Quantidade (L.)|ano|
+-------+---------------+---+
|      0|              0|  0|
+-------+---------------+---+



In [138]:
def CarregaParquetEmMemoriaProcessamento(subopts):

    name = "processamento"

    anoInicio = 1970
    anoTermino = datetime.date.today().year

    dataframes = []

    for subopt_name, subopt_value in subopts.items():

        for ano in range(anoInicio, anoTermino):

            file_name_parquet = f"../data/{name}_{subopt_name}_{ano}.parquet"

            df = spark.read.parquet(file_name_parquet, inferSchema=True)

            df = df.withColumn("ano", lit(ano))

            df = df.withColumn("tipo", lit(subopt_name))

            df = df.withColumn("Quantidade (Kg)", f.regexp_replace("`Quantidade (Kg)`", '-', '0'))
            df = df.withColumn("Quantidade (Kg)", f.regexp_replace("`Quantidade (Kg)`", r'[.]', ''))
            df = df.withColumn("Quantidade (Kg)", df['`Quantidade (Kg)`'].cast(LongType()))
        
            dataframes.append(df)

    # Unindo todos os DataFrames da lista
    return reduce(lambda df1, df2: df1.union(df2), dataframes)

In [139]:
subopts = {
    "Viníferas":"1"
    , "Americanas e híbridas":"2"
    , "Uvas de mesa":"3"
    , "Sem classificação":"4"
    }
df_processamento = CarregaParquetEmMemoriaProcessamento(subopts=subopts)

In [143]:
df_processamento.printSchema()

root
 |-- Cultivar: string (nullable = true)
 |-- Quantidade (Kg): long (nullable = true)
 |-- ano: integer (nullable = false)
 |-- tipo: string (nullable = false)



In [141]:
df_processamento.toPandas()

24/06/30 19:08:04 WARN DAGScheduler: Broadcasting large task binary with size 2003.6 KiB
                                                                                

Unnamed: 0,Cultivar,Quantidade (Kg),ano,tipo
0,TINTAS,10448228.0,1970,Viníferas
1,Alicante Bouschet,0.0,1970,Viníferas
2,Ancelota,0.0,1970,Viníferas
3,Aramon,0.0,1970,Viníferas
4,Alfrocheiro,0.0,1970,Viníferas
...,...,...,...,...
11659,Total,0.0,2021,Sem classificação
11660,Sem classificação,,2022,Sem classificação
11661,Total,0.0,2022,Sem classificação
11662,Sem classificação,0.0,2023,Sem classificação


In [144]:
def CarregaParquetEmMemoriaComercializacao():

    name = "comercializacao"
    anoInicio = 1970
    anoTermino = datetime.date.today().year

    dataframes = []

    for ano in range(anoInicio, anoTermino):

        file_name_parquet = f"../data/{name}_{ano}.parquet"

        df = spark.read.parquet(file_name_parquet, inferSchema=True)

        df = df.withColumn("ano", lit(ano))
        df = df.withColumn("Quantidade (L.)", f.regexp_replace("`Quantidade (L.)`", '-', '0'))
        df = df.withColumn("Quantidade (L.)", f.regexp_replace("`Quantidade (L.)`", r'[.]', ''))
        df = df.withColumn("Quantidade (L.)", df['`Quantidade (L.)`'].cast(LongType()))
       
        dataframes.append(df)

    # Unindo todos os DataFrames da lista
    return reduce(lambda df1, df2: df1.union(df2), dataframes)

In [145]:
df_comercializacao = CarregaParquetEmMemoriaComercializacao()

In [146]:
df_comercializacao.printSchema()

root
 |-- Produto: string (nullable = true)
 |-- Quantidade (L.): long (nullable = true)
 |-- ano: integer (nullable = false)



In [147]:
df_comercializacao.toPandas()

Unnamed: 0,Produto,Quantidade (L.),ano
0,VINHO DE MESA,98327606,1970
1,Tinto,83300735,1970
2,Rosado,107681,1970
3,Branco,14919190,1970
4,VINHO FINO DE MESA,4430629,1970
...,...,...,...
3397,Vinho composto,981,2023
3398,Vinho licoroso,421974,2023
3399,Vinho leve,132064,2023
3400,Vinho gaseificado,410215,2023


In [148]:
def CarregaParquetEmMemoriaImportacao(subopts):

    name = "importacao"

    anoInicio = 1970
    anoTermino = datetime.date.today().year

    dataframes = []

    for subopt_name, subopt_value in subopts.items():

        for ano in range(anoInicio, anoTermino):

            file_name_parquet = f"../data/{name}_{subopt_name}_{ano}.parquet"

            df = spark.read.parquet(file_name_parquet, inferSchema=True)

            df = df.withColumn("ano", lit(ano))

            df = df.withColumn("tipo", lit(subopt_name))

            df = df.withColumn("Quantidade (Kg)", f.regexp_replace("`Quantidade (Kg)`", '-', '0'))
            df = df.withColumn("Quantidade (Kg)", f.regexp_replace("`Quantidade (Kg)`", r'[.]', ''))
            df = df.withColumn("Quantidade (Kg)", df['`Quantidade (Kg)`'].cast(LongType()))
        
            df = df.withColumn("Valor (US$)", f.regexp_replace("`Valor (US$)`", '-', '0'))
            df = df.withColumn("Valor (US$)", f.regexp_replace("`Valor (US$)`", r'[.]', ''))
            df = df.withColumn("Valor (US$)", df['`Valor (US$)`'].cast(LongType()))
        
            dataframes.append(df)

    # Unindo todos os DataFrames da lista
    return reduce(lambda df1, df2: df1.union(df2), dataframes)

In [149]:
subopts = {
    "Vinhos de mesa":"1"
    , "Espumantes":"2"
    , "Uvas frescas":"3"
    , "Uvas passas":"4"
    , "Suco de uva":"5"
    }
df_importacao = CarregaParquetEmMemoriaImportacao(subopts)

In [151]:
df_importacao.printSchema()

root
 |-- Países: string (nullable = true)
 |-- Quantidade (Kg): long (nullable = true)
 |-- Valor (US$): long (nullable = true)
 |-- ano: integer (nullable = false)
 |-- tipo: string (nullable = false)



In [152]:
df_importacao.toPandas()

24/06/30 19:16:26 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
                                                                                

Unnamed: 0,Países,Quantidade (Kg),Valor (US$),ano,tipo
0,Africa do Sul,0,0,1970,Vinhos de mesa
1,Alemanha,52297,30498,1970,Vinhos de mesa
2,Argélia,0,0,1970,Vinhos de mesa
3,Arábia Saudita,0,0,1970,Vinhos de mesa
4,Argentina,19525,12260,1970,Vinhos de mesa
...,...,...,...,...,...
10903,Turquia,0,0,2023,Suco de uva
10904,Uruguai,0,0,2023,Suco de uva
10905,Venezuela,0,0,2023,Suco de uva
10906,Outros,0,0,2023,Suco de uva


In [153]:
def CarregaParquetEmMemoriaExportacao(subopts):

    name = "exportacao"

    anoInicio = 1970
    anoTermino = datetime.date.today().year

    dataframes = []

    for subopt_name, subopt_value in subopts.items():

        for ano in range(anoInicio, anoTermino):

            file_name_parquet = f"../data/{name}_{subopt_name}_{ano}.parquet"

            df = spark.read.parquet(file_name_parquet, inferSchema=True)

            df = df.withColumn("ano", lit(ano))

            df = df.withColumn("tipo", lit(subopt_name))

            df = df.withColumn("Quantidade (Kg)", f.regexp_replace("`Quantidade (Kg)`", '-', '0'))
            df = df.withColumn("Quantidade (Kg)", f.regexp_replace("`Quantidade (Kg)`", r'[.]', ''))
            df = df.withColumn("Quantidade (Kg)", df['`Quantidade (Kg)`'].cast(LongType()))
        
            df = df.withColumn("Valor (US$)", f.regexp_replace("`Valor (US$)`", '-', '0'))
            df = df.withColumn("Valor (US$)", f.regexp_replace("`Valor (US$)`", r'[.]', ''))
            df = df.withColumn("Valor (US$)", df['`Valor (US$)`'].cast(LongType()))
        
            dataframes.append(df)

    # Unindo todos os DataFrames da lista
    return reduce(lambda df1, df2: df1.union(df2), dataframes)

In [154]:
subopts = {
    "Vinhos de mesa":"1"
    , "Espumantes":"2"
    , "Uvas frescas":"3"
    , "Suco de uva":"4"
    }
df_exportacao = CarregaParquetEmMemoriaExportacao(subopts=subopts)

In [155]:
df_exportacao.printSchema()

root
 |-- Países: string (nullable = true)
 |-- Quantidade (Kg): long (nullable = true)
 |-- Valor (US$): long (nullable = true)
 |-- ano: integer (nullable = false)
 |-- tipo: string (nullable = false)



In [156]:
df_exportacao.toPandas()

24/06/30 19:18:11 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
                                                                                

Unnamed: 0,Países,Quantidade (Kg),Valor (US$),ano,tipo
0,Afeganistão,0,0,1970,Vinhos de mesa
1,África do Sul,0,0,1970,Vinhos de mesa
2,"Alemanha, República Democrática",0,0,1970,Vinhos de mesa
3,Angola,0,0,1970,Vinhos de mesa
4,Anguilla,0,0,1970,Vinhos de mesa
...,...,...,...,...,...
26995,Turquia,230,455,2023,Suco de uva
26996,Uruguai,0,0,2023,Suco de uva
26997,Vanuatu,0,0,2023,Suco de uva
26998,Venezuela,30206,47287,2023,Suco de uva


In [182]:
spark.stop()