In [1]:
# creating a virgual enviroment
!python -m venv venv

In [112]:
# !pip install pandas
# !pip install openpyxl
# !pip install PyArrow
!pip freeze > requirements.txt

### Importing Libries

In [1]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder.appName('Quering_Data').getOrCreate()

### Setting the Base Directory

In [2]:
BASE_DIR = os.path.abspath(os.path.dirname('__file__'))
DATA_DIR = os.path.join(BASE_DIR, 'dataset')

### Reading CSV File and creating a temp table using Spark

In [None]:
vendas = spark.read.csv(os.path.join(DATA_DIR, 'report_vendas.csv'), header=True, inferSchema=True, sep=';')


### Creating a Temp Table

In [16]:
vendas.createOrReplaceTempView('vendas')
spark.sql('''
SELECT
                *
FROM
        vendas
''').show()

+------+--------------------+--------+-------+--------------------+------+----+-----------+----+----+--------+----------+--------+------------+--------+--------+----------------+------------+-----------+
|CODIGO|           DESCRICAO|    DATA|   NOTA|             CLIENTE|   TPO| CFO|VL_CONTABIL| IPI|ICMS|   CUSTO|QUANTIDADE|  C_UNIT|S_QUANTIDADE|S_C_UNIT| S_TOTAL|SALDO_QUANTIDADE|SALDO_C_UNIT|SALDO_TOTAL|
+------+--------------------+--------+-------+--------------------+------+----+-----------+----+----+--------+----------+--------+------------+--------+--------+----------------+------------+-----------+
|  1019|DECA BASE REG PRE...|01/03/21| 374059|Transf. para Fili...|   925|5409|       null|null|null|    null|      null|    null|       1,000|18,78917|   18,79|         143,000|    18,78917|   2.686,85|
|  1019|DECA BASE REG PRE...|01/03/21| 374084|Transf. para Fili...|   925|5409|       null|null|null|    null|      null|    null|       1,000|18,78917|   18,79|         142,000|    18

In [20]:
spark.sql('''
SELECT
        CODIGO
      , DESCRICAO
      , COUNT(*) AS QTDE
FROM
      vendas
GROUP BY
        CODIGO
      , DESCRICAO
ORDER BY
      3 DESC
''').show()

+------+--------------------+----+
|CODIGO|           DESCRICAO|QTDE|
+------+--------------------+----+
|201271|QUARTZ ARG FLEXIV...| 657|
|201273|QUARTZ ARG PORC I...| 504|
|200082|QRTZ.ARG CIMENTCO...| 400|
|201027|QUARTZ ARG SUPERC...| 248|
| 37173|AMANCO ANEL CERA ...| 232|
| 23399|GB PARAFUSO CASTE...| 213|
| 30397|BUMAX ESPACADOR C...| 204|
| 25665|EFLOOR MANTA C/1,...| 180|
|201160|QUARTZ REJ FLEXIV...| 178|
| 18664|FIORANNO AZJ 32X5...| 173|
| 48525|DURAGRES GRESALAT...| 170|
| 40139|EMBRA POR RET 62X...| 161|
| 16832|QUARTZ REJ POR/MA...| 154|
| 52509|SIF SINFONO/UNIV ...| 150|
| 16411|QUARTZ REJ POR/MA...| 146|
| 29512|ELIZABETH POR 62,...| 146|
| 17146|QUARTZ REJ POR/MA...| 145|
| 25664|E.FLOOR COLA PVA ...| 141|
| 46693|CARIOCA G. ROUPA ...| 141|
|  8605|TRAM TQ C/V A304 ...| 140|
+------+--------------------+----+
only showing top 20 rows



In [11]:
vendas.printSchema()

root
 |-- CODIGO: integer (nullable = true)
 |-- DESCRICAO: string (nullable = true)
 |-- DATA: string (nullable = true)
 |-- NOTA: integer (nullable = true)
 |-- CLIENTE: string (nullable = true)
 |-- TPO: integer (nullable = true)
 |-- CFO: integer (nullable = true)
 |-- VL_CONTABIL: string (nullable = true)
 |-- IPI: string (nullable = true)
 |-- ICMS: string (nullable = true)
 |-- CUSTO: string (nullable = true)
 |-- QUANTIDADE: string (nullable = true)
 |-- C_UNIT: string (nullable = true)
 |-- S_QUANTIDADE: string (nullable = true)
 |-- S_C_UNIT: string (nullable = true)
 |-- S_TOTAL: string (nullable = true)
 |-- SALDO_QUANTIDADE: string (nullable = true)
 |-- SALDO_C_UNIT: string (nullable = true)
 |-- SALDO_TOTAL: string (nullable = true)



### Working with Excel file

### Function to convert string price to float

In [80]:
import re


def convert_price(preco: str) -> float:
    p = re.sub(r'[^0-9.,]+', '', preco)
    try:
        return float(p.replace('.', '').replace(',', '.'))
    except:
        return 0


In [106]:
import pandas as pd
# from pyspark.pandas import read_excel


# first create a dataframe using pandas
df = pd.read_excel(os.path.join(DATA_DIR, 'caminhoes_carretas.xlsx'))
df = df.drop(columns=['detalhes.Eixos'])
df['detalhes.Preço'] = df['detalhes.Preço'].apply(convert_price)
df_new = df.iloc[:, 4:8].copy()
df_new.columns = ['PRECO', 'TIPO', 'MARCA', 'MODELO']
df_new.head()

Unnamed: 0,PRECO,TIPO,MARCA,MODELO
0,65000.0,CARRETA,SEMI-REBOQUE,CARGA SECA
1,125000.0,CARRETA,BITREM,GRANELEIRO
2,18000.0,CARRETA,SEMI-REBOQUE,BAU FURGÃO
3,405000.0,CARRETA,RODOTREM,BASCULANTE
4,326900.0,CARRETA,RODOTREM,BASCULANTE


In [107]:
# transform pandas dataframe into a spark dataframe
caminhoes = spark.createDataFrame(df_new)
caminhoes.createOrReplaceTempView('caminhao')



In [108]:
caminhoes.printSchema()

root
 |-- PRECO: double (nullable = true)
 |-- TIPO: string (nullable = true)
 |-- MARCA: string (nullable = true)
 |-- MODELO: string (nullable = true)



In [111]:
spark.sql('''
SELECT 
            TIPO
          , SUM(PRECO) AS TOTAL 
FROM 
        caminhao
GROUP BY 
            TIPO
''').show()

+--------------------+-------------+
|                TIPO|        TOTAL|
+--------------------+-------------+
|             CARRETA|    2693500.0|
|CARROCERIA SOBRE ...|1.178591112E7|
|              ONIBUS|     973000.0|
|         UTILITARIOS|    2370500.0|
|                VANS|    1166000.0|
|              TRATOR|    3864900.0|
+--------------------+-------------+



In [102]:
spark.sql('''
SELECT 
        detalhes.Tipo as Tipo
     --,  SUM(detalhes.Preço) as Valor_Total
FROM 
        caminhao
GROUP BY
        detalhes.Tipo
''').show()

AnalysisException: Column 'detalhes.Tipo' does not exist. Did you mean one of the following? [caminhao.detalhes.Tipo, caminhao.detalhes.Preço, caminhao.detalhes.Marca, caminhao.detalhes.Modelo]; line 3 pos 8;
'Aggregate ['detalhes.Tipo], ['detalhes.Tipo AS Tipo#880]
+- SubqueryAlias caminhao
   +- View (`caminhao`, [detalhes.Preço#847,detalhes.Tipo#848,detalhes.Marca#849,detalhes.Modelo#850])
      +- LogicalRDD [detalhes.Preço#847, detalhes.Tipo#848, detalhes.Marca#849, detalhes.Modelo#850], false
