# Aula 3 - Consultas e Seleções

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
# Spark SQl Consultas e Seleções
df = spark.sql('''select 'OK' as Status''')
df.show()

+------+
|Status|
+------+
|    OK|
+------+



In [3]:
# Importing Data
# df = spark.read.csv('ceral.csv', sep = ',', interShema = True, header = True)
import requests
import pandas as pd
import io

url = "https://raw.githubusercontent.com/SandraRojasZ/Pos_Tech_Data_Analytics/main/Base_de_Dados/cereal.csv"
#df = spark.read.csv('cereal.csv', sep = ',', inferSchema = True, header = True)
response = requests.get(url)
response.raise_for_status()  # Raise an exception for bad status codes

# Convert the data to a Pandas DataFrame
data = response.text
df_pandas = pd.read_csv(io.StringIO(data))

In [4]:
df = spark.createDataFrame(df_pandas)

print('df.count :', df.count())
print('df.col ct :', len(df.columns))
print('df.columns:', df.columns)

df.count : 77
df.col ct : 16
df.columns: ['name', 'mfr', 'type', 'calories', 'protein', 'fat', 'sodium', 'fiber', 'carbo', 'sugars', 'potass', 'vitamins', 'shelf', 'weight', 'cups', 'rating']


In [5]:
df.show(3)

+-----------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|             name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+-----------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|        100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|         All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
+-----------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
only showing top 3 rows



In [6]:
# Manipulation Data with  Spark SQL
# Criado uma tabela temporária a partir do df
df.createOrReplaceTempView("cereal")

In [7]:
cereal = spark.sql('''SELECT COUNT(*) AS total FROM cereal WHERE mfr = 'G' ''')
cereal.show(3)
# cereal.count()

+-----+
|total|
+-----+
|   22|
+-----+



In [8]:
df.where(df['mfr'] == 'G').show(3)
df.where(df['mfr'] == 'G').count()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|Apple Cinnamon Ch...|  G|   C|     110|      2|  2|   180|  1.5| 10.5|    10|    70|      25|    1|   1.0|0.75|29.509541|
|             Basic 4|  G|   C|     130|      3|  2|   210|  2.0| 18.0|     8|   100|      25|    3|  1.33|0.75|37.038562|
|            Cheerios|  G|   C|     110|      6|  2|   290|  2.0| 17.0|     1|   105|      25|    1|   1.0|1.25|50.764999|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
only showing top 3 rows



22

In [9]:
# Visualizando todas as colunas que se tem na tabela
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- mfr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- calories: long (nullable = true)
 |-- protein: long (nullable = true)
 |-- fat: long (nullable = true)
 |-- sodium: long (nullable = true)
 |-- fiber: double (nullable = true)
 |-- carbo: double (nullable = true)
 |-- sugars: long (nullable = true)
 |-- potass: long (nullable = true)
 |-- vitamins: long (nullable = true)
 |-- shelf: long (nullable = true)
 |-- weight: double (nullable = true)
 |-- cups: double (nullable = true)
 |-- rating: double (nullable = true)



In [10]:
df.createOrReplaceTempView("cereal")

In [11]:
# Select no SparkSQL
# Removendo duplicatas -> DISTINCT
cereal = spark.sql('''SELECT DISTINCT name, type, mfr FROM cereal''')
cereal.show(3)

+-------------------+----+---+
|               name|type|mfr|
+-------------------+----+---+
|Frosted Mini-Wheats|   C|  K|
|      Count Chocula|   C|  G|
|            Crispix|   C|  K|
+-------------------+----+---+
only showing top 3 rows



In [12]:
# Removendo duplicatas -> DISTINCT
cereal = spark.sql('''SELECT DISTINCT type, mfr FROM cereal''')
cereal.show()
cereal.count()

+----+---+
|type|mfr|
+----+---+
|   C|  P|
|   C|  Q|
|   C|  N|
|   C|  R|
|   H|  N|
|   C|  G|
|   C|  K|
|   H|  Q|
|   H|  A|
+----+---+



9

In [13]:
# Where no Spark SQL
cereal = spark.sql('''SELECT * FROM cereal WHERE mfr = 'K' AND calories >= 100 ''')
cereal.count()
cereal.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|         Apple Jacks|  K|   C|     110|      2|  0|   125|  1.0| 11.0|    14|    30|      25|    2|   1.0| 1.0|33.174094|
|         Corn Flakes|  K|   C|     100|      2|  0|   290|  1.0| 21.0|     2|    35|      25|    1|   1.0| 1.0|45.863324|
|           Corn Pops|  K|   C|     110|      1|  0|    90|  1.0| 13.0|    12|    20|      25|    2|   1.0| 1.0|35.782791|
|  Cracklin' Oat Bran|  K|   C|     110|      3|  3|   140|  4.0| 10.0|     7|   160|      25|    3|   1.0| 0.5|40.448772|
|             Crispix|  K|   C|     110|      2|  0|   220|  1.0| 21.0|     3|    30|      25|    3|   1.0| 1.0|46.895644|
|         Froot 

In [14]:
#  Group BY
# coutn - > funçao nativa do SQL
cereal = spark.sql(''' SELECT mfr, \
                              type, \
                              COUNT(*) AS total,
                              SUM(calories) AS Total_Calories
                        FROM cereal
                        GROUP BY \
                              mfr, \
                              type ''')
# count -> função nativa do Python
cereal.count()
cereal.show()

+---+----+-----+--------------+
|mfr|type|total|Total_Calories|
+---+----+-----+--------------+
|  P|   C|    9|           980|
|  K|   C|   23|          2500|
|  G|   C|   22|          2450|
|  Q|   C|    7|           660|
|  R|   C|    8|           920|
|  N|   H|    1|           100|
|  N|   C|    5|           420|
|  A|   H|    1|           100|
|  Q|   H|    1|           100|
+---+----+-----+--------------+



In [15]:
cereal = spark.sql(''' SELECT DISTINCT type
                       FROM cereal ''')
cereal.show()

+----+
|type|
+----+
|   C|
|   H|
+----+



In [16]:
# Case WHEN
cereal = spark.sql(''' SELECT mfr, \
                              type, \
                              (case
                              -- alterando as letras
                                    when type = 'C' then 'A'
                                    when type = 'H' then 'B'
                                    else 'C'
                              end) as type_new,
                              COUNT(*) AS total,
                              SUM(calories) AS Total_Calories
                        FROM cereal
                        GROUP BY \
                              mfr, \
                              type ''')
# count -> função nativa do Python
cereal.count()
cereal.show()

+---+----+--------+-----+--------------+
|mfr|type|type_new|total|Total_Calories|
+---+----+--------+-----+--------------+
|  P|   C|       A|    9|           980|
|  K|   C|       A|   23|          2500|
|  G|   C|       A|   22|          2450|
|  Q|   C|       A|    7|           660|
|  R|   C|       A|    8|           920|
|  N|   H|       B|    1|           100|
|  N|   C|       A|    5|           420|
|  A|   H|       B|    1|           100|
|  Q|   H|       B|    1|           100|
+---+----+--------+-----+--------------+



In [17]:
# Consultas Avançadas em SQL usando PySpark
cereal_op = spark.sql('''
                    SELECT mfr,
                          type,
                          sum(calories) as sum_cal,
                          min(calories) as min_cal,
                          max(calories) as max_cal,
                          avg(calories) avg_cal,
                          count(distinct name) as count_distinct_names,
                          count(name) as count_names
                    FROM cereal
                    GROUP BY mfr, type
                    ORDER BY mfr, type
                    ''')
cereal_op.show()

+---+----+-------+-------+-------+------------------+--------------------+-----------+
|mfr|type|sum_cal|min_cal|max_cal|           avg_cal|count_distinct_names|count_names|
+---+----+-------+-------+-------+------------------+--------------------+-----------+
|  A|   H|    100|    100|    100|             100.0|                   1|          1|
|  G|   C|   2450|    100|    140|111.36363636363636|                  22|         22|
|  K|   C|   2500|     50|    160|108.69565217391305|                  23|         23|
|  N|   C|    420|     70|     90|              84.0|                   5|          5|
|  N|   H|    100|    100|    100|             100.0|                   1|          1|
|  P|   C|    980|     90|    120|108.88888888888889|                   9|          9|
|  Q|   C|    660|     50|    120| 94.28571428571429|                   7|          7|
|  Q|   H|    100|    100|    100|             100.0|                   1|          1|
|  R|   C|    920|     90|    150|         

In [18]:
# Consultas Avançadas em SQL usando PySpark
cereal_op = spark.sql('''
                    SELECT mfr,
                           type,
                           sum(calories) as sum_cal,
                           min(calories) as min_cal,
                           max(calories) as max_cal,
                           cast(avg(calories) as decimal(10,2)) avg_cal,

                           sum(carbo) as sum_carbo,
                           min(carbo) as min_carbo,
                           max(carbo) as max_carbo,
                           cast(avg(carbo) as decimal(10,2)) as avg_carbo,

                           sum(vitamins) as sum_vitamins,
                           min(vitamins) as min_vitamins,
                           max(vitamins) as max_vitamins,
                           cast(avg(vitamins) as decimal(10,2)) avg_vitamins,

                           count(distinct name) as count_distinct_names,
                           count(name) as count_names
                    FROM cereal
                    GROUP BY mfr, type
                    ORDER BY mfr, type
                    ''')
cereal_op.show()

+---+----+-------+-------+-------+-------+---------+---------+---------+---------+------------+------------+------------+------------+--------------------+-----------+
|mfr|type|sum_cal|min_cal|max_cal|avg_cal|sum_carbo|min_carbo|max_carbo|avg_carbo|sum_vitamins|min_vitamins|max_vitamins|avg_vitamins|count_distinct_names|count_names|
+---+----+-------+-------+-------+-------+---------+---------+---------+---------+------------+------------+------------+------------+--------------------+-----------+
|  A|   H|    100|    100|    100| 100.00|     16.0|     16.0|     16.0|    16.00|          25|          25|          25|       25.00|                   1|          1|
|  G|   C|   2450|    100|    140| 111.36|    324.0|     10.5|     21.0|    14.73|         775|          25|         100|       35.23|                  22|         22|
|  K|   C|   2500|     50|    160| 108.70|    348.0|      7.0|     22.0|    15.13|         800|          25|         100|       34.78|                  23|     

In [19]:
cereal_op = spark.sql('''
                    SELECT mfr,
                           type,
                           (case
                                when mfr = 'A' then 'Abacaxi'
                                when mfr = 'G' then 'Goiaba'
                                when mfr = 'K' then 'Banana'
                                when mfr = 'N' then 'Maça'
                                when mfr = 'P' then 'Tomate'
                                when mfr = 'Q' then 'Pera'
                                when mfr = 'R' then 'Uva'
                                else 'NA'
                           end) as type_fruit,

                           sum(calories) as sum_cal,
                           min(calories) as min_cal,
                           max(calories) as max_cal,
                           cast(avg(calories) as decimal(10,2)) avg_cal,

                           sum(carbo) as sum_carbo,
                           min(carbo) as min_carbo,
                           max(carbo) as max_carbo,
                           cast(avg(carbo) as decimal(10,2)) as avg_carbo,

                           sum(vitamins) as sum_vitamins,
                           min(vitamins) as min_vitamins,
                           max(vitamins) as max_vitamins,
                           cast(avg(vitamins) as decimal(10,2)) avg_vitamins,

                           count(distinct name) as count_distinct_names,
                           count(name) as count_names
                    FROM cereal
                    GROUP BY mfr, type
                    ORDER BY mfr, type
                    ''')
cereal_op.show()

+---+----+----------+-------+-------+-------+-------+---------+---------+---------+---------+------------+------------+------------+------------+--------------------+-----------+
|mfr|type|type_fruit|sum_cal|min_cal|max_cal|avg_cal|sum_carbo|min_carbo|max_carbo|avg_carbo|sum_vitamins|min_vitamins|max_vitamins|avg_vitamins|count_distinct_names|count_names|
+---+----+----------+-------+-------+-------+-------+---------+---------+---------+---------+------------+------------+------------+------------+--------------------+-----------+
|  A|   H|   Abacaxi|    100|    100|    100| 100.00|     16.0|     16.0|     16.0|    16.00|          25|          25|          25|       25.00|                   1|          1|
|  G|   C|    Goiaba|   2450|    100|    140| 111.36|    324.0|     10.5|     21.0|    14.73|         775|          25|         100|       35.23|                  22|         22|
|  K|   C|    Banana|   2500|     50|    160| 108.70|    348.0|      7.0|     22.0|    15.13|         800

In [20]:
# Importing Data
# df = spark.read.csv('ceral.csv', sep = ',', interShema = True, header = True)
import requests
import pandas as pd
import io

sds = "https://raw.githubusercontent.com/SandraRojasZ/Pos_Tech_Data_Analytics/main/Base_de_Dados/sales_data_sample.csv"
#df = spark.read.csv('cereal.csv', sep = ',', inferSchema = True, header = True)
response = requests.get(sds)
response.raise_for_status()  # Raise an exception for bad status codes

# Convert the data to a Pandas DataFrame
data_s = response.text
df_pandas_s = pd.read_csv(io.StringIO(data_s))

In [21]:
sales = spark.createDataFrame(df_pandas_s)

print('sales.count :', sales.count())
print('sales.col ct :', len(sales.columns))
print('sales.columns:', sales.columns)
sales.show()

sales.count : 2823
sales.col ct : 25
sales.columns: ['ORDERNUMBER', 'QUANTITYORDERED', 'PRICEEACH', 'ORDERLINENUMBER', 'SALES', 'ORDERDATE', 'STATUS', 'QTR_ID', 'MONTH_ID', 'YEAR_ID', 'PRODUCTLINE', 'MSRP', 'PRODUCTCODE', 'CUSTOMERNAME', 'PHONE', 'ADDRESSLINE1', 'ADDRESSLINE2', 'CITY', 'STATE', 'POSTALCODE', 'COUNTRY', 'TERRITORY', 'CONTACTLASTNAME', 'CONTACTFIRSTNAME', 'DEALSIZE']
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZ

In [22]:
sales.printSchema()

root
 |-- ORDERNUMBER: long (nullable = true)
 |-- QUANTITYORDERED: long (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: long (nullable = true)
 |-- SALES: double (nullable = true)
 |-- ORDERDATE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: long (nullable = true)
 |-- MONTH_ID: long (nullable = true)
 |-- YEAR_ID: long (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: long (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable = true)
 |-- DEALSIZE: st

In [23]:
# Tabela temporária dentro da sessão do SPARK SQL
sales.createOrReplaceTempView('sales')

In [24]:
data_sales = spark.sql('''
                    SELECT *
                    FROM sales
                      ''')
data_sales.show()

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2| 2871.0| 2/24/2003

In [25]:
calendar = spark.sql('''
                    SELECT DISTINCT orderdate, qtr_id, month_id, year_id
                    FROM sales
                    ORDER BY orderdate
                      ''')
calendar.show()

+--------------+------+--------+-------+
|     orderdate|qtr_id|month_id|year_id|
+--------------+------+--------+-------+
|1/10/2003 0:00|     1|       1|   2003|
|1/10/2005 0:00|     1|       1|   2005|
|1/12/2004 0:00|     1|       1|   2004|
|1/12/2005 0:00|     1|       1|   2005|
|1/15/2004 0:00|     1|       1|   2004|
|1/16/2004 0:00|     1|       1|   2004|
|1/19/2005 0:00|     1|       1|   2005|
| 1/2/2004 0:00|     1|       1|   2004|
|1/20/2005 0:00|     1|       1|   2005|
|1/22/2004 0:00|     1|       1|   2004|
|1/23/2005 0:00|     1|       1|   2005|
|1/26/2004 0:00|     1|       1|   2004|
|1/26/2005 0:00|     1|       1|   2005|
|1/29/2003 0:00|     1|       1|   2003|
|1/29/2004 0:00|     1|       1|   2004|
|1/31/2003 0:00|     1|       1|   2003|
|1/31/2005 0:00|     1|       1|   2005|
| 1/5/2005 0:00|     1|       1|   2005|
| 1/6/2003 0:00|     1|       1|   2003|
| 1/6/2005 0:00|     1|       1|   2005|
+--------------+------+--------+-------+
only showing top

In [36]:
# Criando Tabelas

calendar = spark.sql('''
                    SELECT DISTINCT orderdate, qtr_id, month_id, year_id
                    FROM sales
                    ORDER BY orderdate
                      ''')

sales_data = spark.sql('''
                    SELECT DISTINCT ordernumber,
                                    customername,
                                    orderdate,
                                    sales,
                                    quantityordered,
                                    orderlinenumber,
                                    priceeach
                    FROM sales
                    ORDER BY ordernumber
                      ''')

customers = spark.sql('''
                    SELECT DISTINCT customername,
                                    phone,
                                    sales,
                                    addressline1,
                                    addressline2,
                                    city,
                                    state,
                                    postalcode,
                                    country,
                                    territory
                    FROM sales
                    ORDER BY customername
                    ''')

# Materializar esse dados separados
# Criando tabelas temporárias
sales_data.createOrReplaceTempView('sales_data')
calendar.createOrReplaceTempView('calendar')
customers.createOrReplaceTempView('customers')

In [28]:
calendar.count()

252

In [29]:
sales_data.count()


2823

In [30]:
customers.count()

2823

In [33]:
# Como se fosse uma tabela master com a junção de dados
# União das tabelas
master = spark.sql('''
                   SELECT *
                   FROM sales_data s
                   INNER JOIN customers c ON s.CUSTOMERNAME = c.CUSTOMERNAME
                   ''')
master.show(5)

+-----------+---------------+---------------+-------+---------------+---------------+---------+---------------+--------------+-------+--------------------+------------+-----------+----------+-----------+---------+
|ordernumber|   customername|      orderdate|  sales|quantityordered|orderlinenumber|priceeach|   customername|         phone|  sales|        addressline1|addressline2|      state|postalcode|    country|territory|
+-----------+---------------+---------------+-------+---------------+---------------+---------+---------------+--------------+-------+--------------------+------------+-----------+----------+-----------+---------+
|      10198|Cruz & Sons Co.|11/27/2003 0:00|3255.36|             48|              5|    67.82|Cruz & Sons Co.|+63 2 555 3587|5265.15|15 McCallum Stree...|         NaN|Makati City|   1227 MM|Philippines|    Japan|
|      10108|Cruz & Sons Co.|  3/3/2003 0:00|2914.11|             43|             12|    67.77|Cruz & Sons Co.|+63 2 555 3587|5265.15|15 McCallu

In [38]:
master_c = spark.sql('''
                  SELECT DISTINCT s.ordernumber, c.city
                  FROM sales_data s
                  INNER JOIN customers c ON s.CUSTOMERNAME = c. CUSTOMERNAME
                  ''')
master_c.show(5)

+-----------+---------+
|ordernumber|     city|
+-----------+---------+
|      10262|   Madrid|
|      10281|Allentown|
|      10194|     Lyon|
|      10242|      NYC|
|      10247|    Espoo|
+-----------+---------+
only showing top 5 rows

