In [125]:
!pip install pyspark
!pip install xlrd




In [188]:
import xlrd 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, explode, array, struct, expr, sum, lit  
from pyspark.sql.functions import concat, concat_ws, lit, col, trim, to_date
import datetime

In [170]:
wb = xlrd.open_workbook('vendas-combustiveis-m3.xls')

In [171]:
sheet_names = [name for name in wb.sheet_names() if name != 'Plan1']
data = []
columns = wb.sheet_by_name(sheet_names[0]).row_values(0)
for sheet_name in sheet_names:
    sheet = wb.sheet_by_name(sheet_name)
    ncols = sheet.ncols
    nrows = sheet.nrows
    for i in range(nrows):
        if i > 0:
            data.append(sheet.row_values(i))

In [172]:
for i in range(len(data)):
  for k in range(len(data[i])):
    if data[i][k] == '':
      data[i][k] = 0.0

In [173]:
for i in data:
  if len(i) != 18:
    print(len(i))

In [174]:
spark = SparkSession.builder.appName('sparkdf').getOrCreate()
df = spark.createDataFrame(data, columns).drop('TOTAL')
df = df.withColumn("ANO", F.round(df["ANO"]).cast('integer'))

In [175]:
df.show()

+---------------+----+---------------+-------------------+-------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|    COMBUSTÍVEL| ANO|         REGIÃO|             ESTADO|UNIDADE|       Jan|       Fev|       Mar|       Abr|       Mai|       Jun|       Jul|       Ago|       Set|       Out|       Nov|       Dez|
+---------------+----+---------------+-------------------+-------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|GASOLINA C (m3)|2000|   REGIÃO NORTE|           RONDÔNIA|     m3|  9563.263| 11341.229|  9369.746| 10719.983| 11165.968| 12312.451|  11220.97| 12482.281| 13591.122|  11940.57| 11547.576| 10818.094|
|GASOLINA C (m3)|2000|   REGIÃO NORTE|               ACRE|     m3|  3065.758|   3495.29|   2946.93|   3023.92|   3206.93|   3612.58|   3264.46|   3835.74|  3676.571|   3225.61|  3289.718|  3358.346|
|GASO

In [176]:
def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("MES"), col(c).alias("VOLUME")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.MES", "kvs.VOLUME"])

In [177]:
df2 = to_explode(df, ['COMBUSTÍVEL', 'ANO', 'REGIÃO', 'ESTADO', 'UNIDADE'])

In [178]:
df2.show()

+---------------+----+------------+--------+-------+---+---------+
|    COMBUSTÍVEL| ANO|      REGIÃO|  ESTADO|UNIDADE|MES|   VOLUME|
+---------------+----+------------+--------+-------+---+---------+
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Jan| 9563.263|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Fev|11341.229|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Mar| 9369.746|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Abr|10719.983|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Mai|11165.968|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Jun|12312.451|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Jul| 11220.97|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Ago|12482.281|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Set|13591.122|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Out| 11940.57|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Nov|11547.576|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3|Dez|10818.

In [179]:
map_mes = {}
count = 1
for i in ['Jan', 'Fev',
       'Mar', 'Abr', 'Mai', 'Jun', 'Jul', 'Ago', 'Set', 'Out', 'Nov', 'Dez']:
    
    map_mes[i] = str(count) if count >= 10 else '0'+ str(count) 
    count += 1

In [180]:
map_mes

{'Abr': '04',
 'Ago': '08',
 'Dez': '12',
 'Fev': '02',
 'Jan': '01',
 'Jul': '07',
 'Jun': '06',
 'Mai': '05',
 'Mar': '03',
 'Nov': '11',
 'Out': '10',
 'Set': '09'}

In [181]:
udf_map_mes = udf(lambda x:mapMes(x),StringType() )

def mapMes(x):
  return map_mes[x]

df2 = df2.withColumn("MES",udf_map_mes(col("MES"))).select("*")
df2 = df2.withColumn("DATA", concat(lit("01/"),col("MES"),lit("/"),col("ANO")))
df2 = df2.withColumn("year_month", to_date(col("DATA"), "dd/MM/yyyy"))


In [182]:
df2.show()

+---------------+----+------------+--------+-------+---+---------+----------+----------+
|    COMBUSTÍVEL| ANO|      REGIÃO|  ESTADO|UNIDADE|MES|   VOLUME|      DATA|year_month|
+---------------+----+------------+--------+-------+---+---------+----------+----------+
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 01| 9563.263|01/01/2000|2000-01-01|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 02|11341.229|01/02/2000|2000-02-01|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 03| 9369.746|01/03/2000|2000-03-01|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 04|10719.983|01/04/2000|2000-04-01|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 05|11165.968|01/05/2000|2000-05-01|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 06|12312.451|01/06/2000|2000-06-01|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 07| 11220.97|01/07/2000|2000-07-01|
|GASOLINA C (m3)|2000|REGIÃO NORTE|RONDÔNIA|     m3| 08|12482.281|01/08/2000|2000-08-01|
|GASOLINA C (m3)|2000

In [183]:
df2 = df2.withColumnRenamed('DATA','year_month').withColumnRenamed('COMBUSTÍVEL','product')\
        .withColumnRenamed('VOLUME','volume').withColumnRenamed('UNIDADE','unit').withColumnRenamed('ESTADO','uf')\
        .drop(*('MES', 'REGIÃO', 'ANO'))

In [186]:
df2 = df2.withColumn("created_at", F.current_date())

In [187]:
df2.show()

+---------------+--------+----+---------+----------+----------+----------+
|        product|      uf|unit|   volume|year_month|year_month|created_at|
+---------------+--------+----+---------+----------+----------+----------+
|GASOLINA C (m3)|RONDÔNIA|  m3| 9563.263|01/01/2000|2000-01-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3|11341.229|01/02/2000|2000-02-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3| 9369.746|01/03/2000|2000-03-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3|10719.983|01/04/2000|2000-04-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3|11165.968|01/05/2000|2000-05-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3|12312.451|01/06/2000|2000-06-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3| 11220.97|01/07/2000|2000-07-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3|12482.281|01/08/2000|2000-08-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3|13591.122|01/09/2000|2000-09-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA|  m3| 11940.57|01/10/2000|2000-10-01|2022-03-09|
|GASOLINA C (m3)|RONDÔNIA

In [193]:

output_file_name = "data_{}.parquet".format(datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
df.write.parquet(output_file_name)

In [None]:
df2 = df2.withColumn("DATA", concat(lit("1/"),col("MES"),lit("/"),col("ANO")))