# Aula 1

## Instalação do PySpark

In [1]:
# !pip install pyspark
# !pip istall findspark

In [5]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as f
from functools import reduce

In [2]:
findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()

23/09/05 20:04:00 WARN Utils: Your hostname, MacBook-Air-de-Marcelo.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
23/09/05 20:04:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/05 20:04:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df = spark.sql('''SELECT 'Sucesso total, estamos online' as hello''')
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+
|               hello|
+--------------------+
|Sucesso total, es...|
+--------------------+



                                                                                

## Data Manipulation using Spark

In [6]:
df = spark.read.csv('../../../Data/Fase 3/banklist.csv', sep=',', inferSchema=True, header=True)

print('df.count   :', df.count())
print('df.col ct  :', len(df.columns))
print('df.columns :', df.columns)

df.count   : 561
df.col ct  : 6
df.columns : ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


## Using SQL in PySpark

In [7]:
df.createOrReplaceTempView('banklist')

df_check = spark.sql('''select 'Bank Name', City, 'Closing Date' from banklist''')
df_check.show(4, truncate=False)

+---------+-------------+------------+
|Bank Name|City         |Closing Date|
+---------+-------------+------------+
|Bank Name|Barboursville|Closing Date|
|Bank Name|Ericson      |Closing Date|
|Bank Name|Newark       |Closing Date|
|Bank Name|Maumee       |Closing Date|
+---------+-------------+------------+
only showing top 4 rows



## DataFrame Basics Operations

In [8]:
df.describe().show()

                                                                                

+-------+--------------------+-------+----+-----------------+---------------------+------------+
|summary|           Bank Name|   City|  ST|             CERT|Acquiring Institution|Closing Date|
+-------+--------------------+-------+----+-----------------+---------------------+------------+
|  count|                 561|    561| 561|              561|                  561|         561|
|   mean|                null|   null|null|31685.68449197861|                 null|        null|
| stddev|                null|   null|null|16446.65659309965|                 null|        null|
|    min|1st American Stat...|Acworth|  AL|               91|      1st United Bank|    1-Aug-08|
|    max|               ebank|Wyoming|  WY|            58701|  Your Community Bank|    9-Sep-11|
+-------+--------------------+-------+----+-----------------+---------------------+------------+



In [9]:
df.describe('City', 'ST').show()

                                                                                

+-------+-------+----+
|summary|   City|  ST|
+-------+-------+----+
|  count|    561| 561|
|   mean|   null|null|
| stddev|   null|null|
|    min|Acworth|  AL|
|    max|Wyoming|  WY|
+-------+-------+----+



## Count, Columns and Schema

In [12]:
print('Total de linhas   :', df.count())
print('Total de colunas   :', len(df.columns))
print('Colunas :', df.columns)
print('Tipo de dados :', df.dtypes)
print('Shema :', df.schema)

Total de linhas   : 561
Total de colunas   : 6
Colunas : ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']
Tipo de dados : [('Bank Name', 'string'), ('City', 'string'), ('ST', 'string'), ('CERT', 'int'), ('Acquiring Institution', 'string'), ('Closing Date', 'string')]
Shema : StructType(List(StructField(Bank Name,StringType,true),StructField(City,StringType,true),StructField(ST,StringType,true),StructField(CERT,IntegerType,true),StructField(Acquiring Institution,StringType,true),StructField(Closing Date,StringType,true)))


In [13]:
df.printSchema()

root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)



## Remove Duplicates

In [14]:
df = df.dropDuplicates()
print('Total de linhas   :', df.count())
print('Colunas :', df.columns)



Total de linhas   : 561
Colunas : ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


                                                                                

## Select Specific Columns

In [15]:
df2 = df.select(*['Bank Name', 'City'])
df2.show()

+--------------------+----------------+
|           Bank Name|            City|
+--------------------+----------------+
| First Bank of Idaho|         Ketchum|
|Amcore Bank, Nati...|        Rockford|
|        Venture Bank|           Lacey|
|First State Bank ...|           Altus|
|Valley Capital Ba...|            Mesa|
|Michigan Heritage...|Farmington Hills|
|Columbia Savings ...|      Cincinnati|
|       Fidelity Bank|        Dearborn|
|The Park Avenue Bank|        Valdosta|
|Western Commercia...|  Woodland Hills|
|        Syringa Bank|           Boise|
|Republic Federal ...|           Miami|
|Westside Communit...|University Place|
|   First United Bank|           Crete|
|HarVest Bank of M...|    Gaithersburg|
|            BankEast|       Knoxville|
|    Polk County Bank|        Johnston|
|Colorado Capital ...|     Castle Rock|
|         Access Bank|        Champlin|
|Pacific National ...|   San Francisco|
+--------------------+----------------+
only showing top 20 rows



## Select Multiple Columns

In [16]:
col_1 = list(set(df.columns) - {'CRET', 'ST'})
df2 = df.select(*col_1)
df2.show(2)

+-----+------------+--------+---------------------+--------------------+
| CERT|Closing Date|    City|Acquiring Institution|           Bank Name|
+-----+------------+--------+---------------------+--------------------+
|34396|   24-Apr-09| Ketchum|      U.S. Bank, N.A.| First Bank of Idaho|
| 3735|   23-Apr-10|Rockford|          Harris N.A.|Amcore Bank, Nati...|
+-----+------------+--------+---------------------+--------------------+
only showing top 2 rows



## Rename Columns

In [18]:
df2 = df \
    .withColumnRenamed('Bank Name', 'bank_name') \
    .withColumnRenamed('Acquiring Institution', 'acq_institution') \
    .withColumnRenamed('Closing Date', 'closing_date') \
    .withColumnRenamed('ST', 'state') \
    .withColumnRenamed('CERT', 'cert')

df2.show()

+--------------------+----------------+-----+-----+--------------------+------------+
|           bank_name|            City|state| cert|     acq_institution|closing_date|
+--------------------+----------------+-----+-----+--------------------+------------+
| First Bank of Idaho|         Ketchum|   ID|34396|     U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|        Rockford|   IL| 3735|         Harris N.A.|   23-Apr-10|
|        Venture Bank|           Lacey|   WA|22868|First-Citizens Ba...|   11-Sep-09|
|First State Bank ...|           Altus|   OK| 9873|        Herring Bank|   31-Jul-09|
|Valley Capital Ba...|            Mesa|   AZ|58399|Enterprise Bank &...|   11-Dec-09|
|Michigan Heritage...|Farmington Hills|   MI|34369|      Level One Bank|   24-Apr-09|
|Columbia Savings ...|      Cincinnati|   OH|32284|United Fidelity B...|   23-May-14|
|       Fidelity Bank|        Dearborn|   MI|33883|The Huntington Na...|   30-Mar-12|
|The Park Avenue Bank|        Valdosta|   GA|19797|  B

## Add Columns

In [21]:
df2 = df.withColumn('state', col('ST'))
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+-----+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|state|
+--------------------+--------+---+-----+---------------------+------------+-----+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|   ID|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|   IL|
+--------------------+--------+---+-----+---------------------+------------+-----+
only showing top 2 rows



## Add constant column

In [22]:
df2 = df2.withColumn('country', lit('US'))
df2.show(2)

+--------------------+--------+---+-----+---------------------+------------+-----+-------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|state|country|
+--------------------+--------+---+-----+---------------------+------------+-----+-------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|   ID|     US|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|   IL|     US|
+--------------------+--------+---+-----+---------------------+------------+-----+-------+
only showing top 2 rows



## Drop Column

In [23]:
df2 = df.drop('CERT')
df2.show(2)

+--------------------+--------+---+---------------------+------------+
|           Bank Name|    City| ST|Acquiring Institution|Closing Date|
+--------------------+--------+---+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+---------------------+------------+
only showing top 2 rows



## Drop Multiple Column

In [24]:
df2 = df.drop(*['ST', 'CERT'])
df2.show(2)

+--------------------+--------+---------------------+------------+
|           Bank Name|    City|Acquiring Institution|Closing Date|
+--------------------+--------+---------------------+------------+
| First Bank of Idaho| Ketchum|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---------------------+------------+
only showing top 2 rows



In [25]:
df2 = reduce(DataFrame.drop, ['CERT', 'ST'], df)
df2.show(2)

+--------------------+--------+---------------------+------------+
|           Bank Name|    City|Acquiring Institution|Closing Date|
+--------------------+--------+---------------------+------------+
| First Bank of Idaho| Ketchum|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---------------------+------------+
only showing top 2 rows



## Filter Data

In [26]:
# Equal to values
df2 = df.where(df['ST'] == 'NE')

# Between values
df3 = df.where(df['CERT'].between('1000', '2000'))

# Is inside multiple values
df4 = df.where(df['ST'].isin('NE', 'IL'))

print('df.count: ', df.count())
print('df2.count: ', df2.count())
print('df3.count: ', df3.count())
print('df4.count: ', df4.count())

                                                                                

df.count:  561


                                                                                

df2.count:  4


                                                                                

df3.count:  9




df4.count:  73


                                                                                

## Filter Data using Logical Operations

In [29]:
df2 = df.where((df['ST'] == 'NE') & (df['City'] == 'Ericson'))
df2.show(3)

+------------------+-------+---+-----+---------------------+------------+
|         Bank Name|   City| ST| CERT|Acquiring Institution|Closing Date|
+------------------+-------+---+-----+---------------------+------------+
|Ericson State Bank|Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|
+------------------+-------+---+-----+---------------------+------------+



## Replace Values in DataFrame

In [30]:
# Pre replace
df.show(2)

# Post replace
print('Replace 7 in the above dataframe with 17 at all instances')
df.na.replace(7,17).show(2)

+--------------------+--------+---+-----+---------------------+------------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|
+--------------------+--------+---+-----+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+-----+---------------------+------------+
only showing top 2 rows

Replace 7 in the above dataframe with 17 at all instances
+--------------------+--------+---+-----+---------------------+------------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|
+--------------------+--------+---+-----+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+-----+---------------------+-------

In [None]:
spark.stop()

# Aula 2

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
sc = SparkContext.getOrCreate()

23/09/05 21:47:33 WARN Utils: Your hostname, MacBook-Air-de-Marcelo.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
23/09/05 21:47:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/05 21:47:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark = SparkSession.builder.appName('PySpark Dataframe From RDD').getOrCreate()

## Create PySpark DataFrame from an Existing RDD

In [4]:
rdd = sc.parallelize([('C', 85, 76, 87, 91), ('B', 85, 76, 87, 91), ('A', 85, 78, 96, 92), ('A', 92, 76, 89, 96)], 4)

In [5]:
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [6]:
sub = ['id_person', 'value_1', 'value_2', 'value_3', 'value_4']

In [7]:
marks_df = spark.createDataFrame(rdd, schema=sub)

                                                                                

In [8]:
print(type(marks_df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [9]:
marks_df.printSchema()

root
 |-- id_person: string (nullable = true)
 |-- value_1: long (nullable = true)
 |-- value_2: long (nullable = true)
 |-- value_3: long (nullable = true)
 |-- value_4: long (nullable = true)



In [10]:
marks_df.show()

                                                                                

+---------+-------+-------+-------+-------+
|id_person|value_1|value_2|value_3|value_4|
+---------+-------+-------+-------+-------+
|        C|     85|     76|     87|     91|
|        B|     85|     76|     87|     91|
|        A|     85|     78|     96|     92|
|        A|     92|     76|     89|     96|
+---------+-------+-------+-------+-------+



## Creating and Manipulation in PySpark DataFrame

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pysparkdf').getOrCreate()

23/09/06 11:09:24 WARN Utils: Your hostname, MacBook-Air-de-Marcelo.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
23/09/06 11:09:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/06 11:09:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Importing Data

In [2]:
df = spark.read.csv('../../../Data/Fase 3/cereal.csv', sep=',', inferSchema=True, header=True)

                                                                                

## Reading the Schema

In [3]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- mfr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- calories: integer (nullable = true)
 |-- protein: integer (nullable = true)
 |-- fat: integer (nullable = true)
 |-- sodium: integer (nullable = true)
 |-- fiber: double (nullable = true)
 |-- carbo: double (nullable = true)
 |-- sugars: integer (nullable = true)
 |-- potass: integer (nullable = true)
 |-- vitamins: integer (nullable = true)
 |-- shelf: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- cups: double (nullable = true)
 |-- rating: double (nullable = true)



## Select()

In [4]:
df.select('name', 'mfr', 'rating').show()

+--------------------+---+---------+
|                name|mfr|   rating|
+--------------------+---+---------+
|           100% Bran|  N|68.402973|
|   100% Natural Bran|  Q|33.983679|
|            All-Bran|  K|59.425505|
|All-Bran with Ext...|  K|93.704912|
|      Almond Delight|  R|34.384843|
|Apple Cinnamon Ch...|  G|29.509541|
|         Apple Jacks|  K|33.174094|
|             Basic 4|  G|37.038562|
|           Bran Chex|  R|49.120253|
|         Bran Flakes|  P|53.313813|
|        Cap'n'Crunch|  Q|18.042851|
|            Cheerios|  G|50.764999|
|Cinnamon Toast Cr...|  G|19.823573|
|            Clusters|  G|40.400208|
|         Cocoa Puffs|  G|22.736446|
|           Corn Chex|  R|41.445019|
|         Corn Flakes|  K|45.863324|
|           Corn Pops|  K|35.782791|
|       Count Chocula|  G|22.396513|
|  Cracklin' Oat Bran|  K|40.448772|
+--------------------+---+---------+
only showing top 20 rows



## withColumn()

In [5]:
df.withColumn('Calories', df['calories'].cast('Integer')).printSchema()

root
 |-- name: string (nullable = true)
 |-- mfr: string (nullable = true)
 |-- type: string (nullable = true)
 |-- Calories: integer (nullable = true)
 |-- protein: integer (nullable = true)
 |-- fat: integer (nullable = true)
 |-- sodium: integer (nullable = true)
 |-- fiber: double (nullable = true)
 |-- carbo: double (nullable = true)
 |-- sugars: integer (nullable = true)
 |-- potass: integer (nullable = true)
 |-- vitamins: integer (nullable = true)
 |-- shelf: integer (nullable = true)
 |-- weight: double (nullable = true)
 |-- cups: double (nullable = true)
 |-- rating: double (nullable = true)



## GroupBy

In [10]:
df.groupBy('Calories').count().show()

                                                                                

+--------+-----+
|Calories|count|
+--------+-----+
|     140|    3|
|     120|   10|
|     100|   17|
|     130|    2|
|      50|    3|
|      80|    1|
|     160|    1|
|      70|    2|
|      90|    7|
|     110|   29|
|     150|    2|
+--------+-----+



                                                                                

## OrderBy

In [12]:
df.orderBy('calories').show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|        Puffed Wheat|  Q|   C|      50|      2|  0|     0|  1.0| 10.0|     0|    50|       0|    3|   0.5| 1.0|63.005645|
|         Puffed Rice|  Q|   C|      50|      1|  0|     0|  0.0| 13.0|     0|    15|       0|    3|   0.5| 1.0|60.756112|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|      Shredded 

## Case When

In [13]:
from pyspark.sql.functions import when

In [15]:
df.select('name', 'vitamins', when(df.vitamins >= '25', 'rich in vitamins')).show()

+--------------------+--------+----------------------------------------------------+
|                name|vitamins|CASE WHEN (vitamins >= 25) THEN rich in vitamins END|
+--------------------+--------+----------------------------------------------------+
|           100% Bran|      25|                                    rich in vitamins|
|   100% Natural Bran|       0|                                                null|
|            All-Bran|      25|                                    rich in vitamins|
|All-Bran with Ext...|      25|                                    rich in vitamins|
|      Almond Delight|      25|                                    rich in vitamins|
|Apple Cinnamon Ch...|      25|                                    rich in vitamins|
|         Apple Jacks|      25|                                    rich in vitamins|
|             Basic 4|      25|                                    rich in vitamins|
|           Bran Chex|      25|                                  

## Filter

In [17]:
df.filter(df.calories >= '100').show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon Ch...|  G|   C|     110|      2|  2|   180|  1.5| 10.5|    10|    70|      25|    1|   1.0|0.75|29.509541|
|         Apple Jacks|  K|   C|     110|      2|  0|   125|  1.0| 11.0|    14|    30|      25|    2|   1.0| 1.0|33.174094|
|             Basic 4|  G|   C|     130|      3|  2|   210|  2.0| 18.0|     8|   100|      25|    3|  1.33|0.75|37.038562|
|        Cap'n'C

## Isnull() / isnotnull()

In [18]:
from pyspark.sql.functions import *

In [19]:
df.filter(df.name.isNotNull()).show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

In [20]:
df.filter(df.name.isNull()).show()

+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+
|name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|rating|
+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+
+----+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+------+



In [None]:
spark.stop()

# Aula 3

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

23/09/07 13:28:02 WARN Utils: Your hostname, MacBook-Air-de-Marcelo.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
23/09/07 13:28:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/07 13:28:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Spark SQL Consultas e Seleções

In [2]:
df = spark.sql('''select  'OK' as Status''')
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+------+
|Status|
+------+
|    OK|
+------+



                                                                                

## Importing Data

In [3]:
df = spark.read.csv('../../../Data/Fase 3/cereal.csv', sep=',', inferSchema=True, header=True)

In [4]:
df.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

## Manipulation Data with Spark SQL

In [5]:
df.createOrReplaceTempView('cereal')

In [6]:
cereal = spark.sql('''select * from cereal''')
cereal.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

In [7]:
df.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

In [8]:
cereal = spark.sql(''' select * from cereal where type = 'C' ''')
cereal.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

In [9]:
df2 = df.where(df['type'] == 'C')
df2.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|           100% Bran|  N|   C|      70|      4|  1|   130| 10.0|  5.0|     6|   280|      25|    3|   1.0|0.33|68.402973|
|   100% Natural Bran|  Q|   C|     120|      3|  5|    15|  2.0|  8.0|     8|   135|       0|    3|   1.0| 1.0|33.983679|
|            All-Bran|  K|   C|      70|      4|  1|   260|  9.0|  7.0|     5|   320|      25|    3|   1.0|0.33|59.425505|
|All-Bran with Ext...|  K|   C|      50|      4|  0|   140| 14.0|  8.0|     0|   330|      25|    3|   1.0| 0.5|93.704912|
|      Almond Delight|  R|   C|     110|      2|  2|   200|  1.0| 14.0|     8|    -1|      25|    3|   1.0|0.75|34.384843|
|Apple Cinnamon 

In [10]:
df2 = df.where(df['mfr'] == 'G')
df2.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|Apple Cinnamon Ch...|  G|   C|     110|      2|  2|   180|  1.5| 10.5|    10|    70|      25|    1|   1.0|0.75|29.509541|
|             Basic 4|  G|   C|     130|      3|  2|   210|  2.0| 18.0|     8|   100|      25|    3|  1.33|0.75|37.038562|
|            Cheerios|  G|   C|     110|      6|  2|   290|  2.0| 17.0|     1|   105|      25|    1|   1.0|1.25|50.764999|
|Cinnamon Toast Cr...|  G|   C|     120|      1|  3|   210|  0.0| 13.0|     9|    45|      25|    2|   1.0|0.75|19.823573|
|            Clusters|  G|   C|     110|      3|  2|   140|  2.0| 13.0|     7|   105|      25|    3|   1.0| 0.5|40.400208|
|         Cocoa 

## Select distinct no SparkSQL

In [12]:
cereal = spark.sql(''' SELECT DISTINCT type, mfr FROM cereal ''')
cereal.show()

+----+---+
|type|mfr|
+----+---+
|   C|  P|
|   C|  Q|
|   C|  N|
|   H|  Q|
|   C|  R|
|   H|  N|
|   C|  G|
|   H|  A|
|   C|  K|
+----+---+



## WHERE no SparkSQL

In [14]:
cereal = spark.sql(''' SELECT * FROM cereal WHERE mfr = 'N' AND calories >= 100 ''')
cereal.show()

+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|                name|mfr|type|calories|protein|fat|sodium|fiber|carbo|sugars|potass|vitamins|shelf|weight|cups|   rating|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+
|Cream of Wheat (Q...|  N|   H|     100|      3|  0|    80|  1.0| 21.0|     0|    -1|       0|    2|   1.0| 1.0|64.533816|
+--------------------+---+----+--------+-------+---+------+-----+-----+------+------+--------+-----+------+----+---------+



## GROUP BY

In [18]:
cereal = spark.sql('''  SELECT mfr,
                            type,
                            COUNT(*) AS total,
                            SUM(calories) AS total_calories
                        FROM cereal
                        GROUP BY mfr,
                            type ''')
cereal.show()

+---+----+-----+--------------+
|mfr|type|total|total_calories|
+---+----+-----+--------------+
|  A|   H|    1|           100|
|  P|   C|    9|           980|
|  K|   C|   23|          2500|
|  G|   C|   22|          2450|
|  Q|   C|    7|           660|
|  R|   C|    8|           920|
|  Q|   H|    1|           100|
|  N|   H|    1|           100|
|  N|   C|    5|           420|
+---+----+-----+--------------+



## CASE WHEN

In [21]:
cereal = spark.sql('''  SELECT mfr,
                            type,
                            CASE
                                WHEN type = 'C' THEN 'A'
                                WHEN type = 'H' THEN 'B'
                                ELSE 'C'
                            END AS type_new,
                            COUNT(*) AS total,
                            SUM(calories) AS total_calories
                        FROM cereal
                        GROUP BY mfr,
                            type ''')
cereal.show()

                                                                                

+---+----+--------+-----+--------------+
|mfr|type|type_new|total|total_calories|
+---+----+--------+-----+--------------+
|  A|   H|       B|    1|           100|
|  P|   C|       A|    9|           980|
|  K|   C|       A|   23|          2500|
|  G|   C|       A|   22|          2450|
|  Q|   C|       A|    7|           660|
|  R|   C|       A|    8|           920|
|  Q|   H|       B|    1|           100|
|  N|   H|       B|    1|           100|
|  N|   C|       A|    5|           420|
+---+----+--------+-----+--------------+



## Consultas Avançadas em SQL usando PySpark

In [23]:
cereal = spark.sql('''
                    SELECT mfr,
                            type,
                            SUM(calories) AS sum_calories,
                            MIN(calories) AS min_calories,
                            MAX(calories) AS max_calories,
                            AVG(calories) AS avg_calories,
                            COUNT(DISTINCT name) AS count_distinct_name,
                            COUNT(name) AS count_name
                    FROM cereal
                    GROUP BY
                        mfr,
                        type
                    ORDER BY 1, 2
                    ''')
cereal.show()

                                                                                

+---+----+------------+------------+------------+------------------+-------------------+----------+
|mfr|type|sum_calories|min_calories|max_calories|      avg_calories|count_distinct_name|count_name|
+---+----+------------+------------+------------+------------------+-------------------+----------+
|  A|   H|         100|         100|         100|             100.0|                  1|         1|
|  G|   C|        2450|         100|         140|111.36363636363636|                 22|        22|
|  K|   C|        2500|          50|         160|108.69565217391305|                 23|        23|
|  N|   C|         420|          70|          90|              84.0|                  5|         5|
|  N|   H|         100|         100|         100|             100.0|                  1|         1|
|  P|   C|         980|          90|         120|108.88888888888889|                  9|         9|
|  Q|   C|         660|          50|         120| 94.28571428571429|                  7|         7|


In [24]:
cereal = spark.sql('''
                    SELECT mfr,
                            type,
                            SUM(calories) AS sum_calories,
                            MIN(calories) AS min_calories,
                            MAX(calories) AS max_calories,
                            CAST(AVG(calories) AS DECIMAL(10, 2)) AS avg_calories,
                   
                            SUM(carbo) AS sum_carbo,
                            MIN(carbo) AS min_carbo,
                            MAX(carbo) AS max_carbo,
                            CAST(AVG(carbo) AS DECIMAL(10, 2)) AS avg_carbo,
                   
                            SUM(vitamins) AS sum_vitamins,
                            MIN(vitamins) AS min_vitamins,
                            MAX(vitamins) AS max_vitamins,
                            CAST(AVG(vitamins) AS DECIMAL(10, 2)) AS avg_vitamins,

                            COUNT(DISTINCT name) AS count_distinct_name,
                            COUNT(name) AS count_name
                    FROM cereal
                    GROUP BY
                        mfr,
                        type
                    ORDER BY 1, 2
                    ''')
cereal.show()

                                                                                

+---+----+------------+------------+------------+------------+---------+---------+---------+---------+------------+------------+------------+------------+-------------------+----------+
|mfr|type|sum_calories|min_calories|max_calories|avg_calories|sum_carbo|min_carbo|max_carbo|avg_carbo|sum_vitamins|min_vitamins|max_vitamins|avg_vitamins|count_distinct_name|count_name|
+---+----+------------+------------+------------+------------+---------+---------+---------+---------+------------+------------+------------+------------+-------------------+----------+
|  A|   H|         100|         100|         100|      100.00|     16.0|     16.0|     16.0|    16.00|          25|          25|          25|       25.00|                  1|         1|
|  G|   C|        2450|         100|         140|      111.36|    324.0|     10.5|     21.0|    14.73|         775|          25|         100|       35.23|                 22|        22|
|  K|   C|        2500|          50|         160|      108.70|    348.

In [25]:
cereal = spark.sql('''
                    SELECT mfr,
                            type,
                            CASE
                                WHEN mfr = 'A' THEN 'Abacaxi'
                                WHEN mfr = 'G' THEN 'Goiaba'
                                WHEN mfr = 'K' THEN 'Banana'
                                WHEN mfr = 'N' THEN 'Maça'
                                WHEN mfr = 'P' THEN 'Tomate'
                                WHEN mfr = 'Q' THEN 'Pera'
                                WHEN mfr = 'R' THEN 'Uva'
                                ELSE 'NA'
                            END AS type_fruit,
                            SUM(calories) AS sum_calories,
                            MIN(calories) AS min_calories,
                            MAX(calories) AS max_calories,
                            CAST(AVG(calories) AS DECIMAL(10, 2)) AS avg_calories,
                   
                            SUM(carbo) AS sum_carbo,
                            MIN(carbo) AS min_carbo,
                            MAX(carbo) AS max_carbo,
                            CAST(AVG(carbo) AS DECIMAL(10, 2)) AS avg_carbo,
                   
                            SUM(vitamins) AS sum_vitamins,
                            MIN(vitamins) AS min_vitamins,
                            MAX(vitamins) AS max_vitamins,
                            CAST(AVG(vitamins) AS DECIMAL(10, 2)) AS avg_vitamins,

                            COUNT(DISTINCT name) AS count_distinct_name,
                            COUNT(name) AS count_name
                    FROM cereal
                    GROUP BY
                        mfr,
                        type
                    ORDER BY 1, 2
                    ''')
cereal.show()

                                                                                

+---+----+----------+------------+------------+------------+------------+---------+---------+---------+---------+------------+------------+------------+------------+-------------------+----------+
|mfr|type|type_fruit|sum_calories|min_calories|max_calories|avg_calories|sum_carbo|min_carbo|max_carbo|avg_carbo|sum_vitamins|min_vitamins|max_vitamins|avg_vitamins|count_distinct_name|count_name|
+---+----+----------+------------+------------+------------+------------+---------+---------+---------+---------+------------+------------+------------+------------+-------------------+----------+
|  A|   H|   Abacaxi|         100|         100|         100|      100.00|     16.0|     16.0|     16.0|    16.00|          25|          25|          25|       25.00|                  1|         1|
|  G|   C|    Goiaba|        2450|         100|         140|      111.36|    324.0|     10.5|     21.0|    14.73|         775|          25|         100|       35.23|                 22|        22|
|  K|   C|    B

## JOINs

### INNER

In [None]:
sales = spark.read.csv('../../../Data/Fase 3/sales_data_samples.csv', sep=',', inferSchema=True, header=True)

In [None]:
sales.show()

In [None]:
sales.printSchema()

In [None]:
sales.createOrReplaceTempView('sales')

In [None]:
calendar = spark.sql(''' 
                    SELECT DISTINCT orderdate,
                        qtr_id,
                        mount_id,
                        year_id
                    FROM sales
                    ORDER BY orderdate
                ''')

sales_data = spark.sql(''' 
                    SELECT DISTINCT ordernumber,
                        customername,
                        orderdate,
                        sales,
                        quantityordered,
                        productcode,
                        orderlinenumber,
                        priceeach
                    FROM sales
                    ORDER BY ordernumber
                ''')

customers = spark.sql(''' 
                    SELECT DISTINCT customername,
                        phone,
                        addressline1,
                        addressline2,
                        city,
                        state,
                        postalcode,
                        territory
                    FROM sales
                    ORDER BY customername
                ''')

calendar.createOrReplaceTempView('calendar')
sales_data.createOrReplaceTempView('sales_data')
customers.createOrReplaceTempView('customers')

In [None]:
calendar.count()

In [None]:
sales_data.count()

In [None]:
customers.count()

In [None]:
master = spark.sql(''' 
                    SELECT *
                    FROM sales_data s
                    INNER JOIN customers c ON s.customername = c.customername
                ''')
master.show(5)

In [12]:
spark.stop()

# Aula 4

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('PySpark DataFrame').getOrCreate()

23/09/07 14:52:41 WARN Utils: Your hostname, MacBook-Air-de-Marcelo.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
23/09/07 14:52:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/07 14:52:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Transformations

### map()

In [2]:
data = [1, 2, 3, 4, 5]
myRDD = sc.parallelize(data)
newRDD = myRDD.map(lambda x: x*2)
print(newRDD.collect())

[Stage 0:>                                                          (0 + 8) / 8]

[2, 4, 6, 8, 10]


                                                                                

### filter()

In [5]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
myRDD = sc.parallelize(data)
newRDD = myRDD.filter(lambda x: x%2 == 0)
print(newRDD.collect())

[2, 4, 6, 8, 10]


### distinct()

In [8]:
data = [1, 1, 1, 2, 2, 2, 3, 3, 3, 3]
myRDD = sc.parallelize(data)
newRDD = myRDD.distinct()
print(newRDD.collect())#count())

[1, 2, 3]


### groupByKey()

In [9]:
myRDD = sc.parallelize([('a', 1), ('a', 2), ('a', 3), ('b', 1)])
resultList = myRDD.groupByKey().mapValues(list)
resultList.collect()

[('a', [1, 2, 3]), ('b', [1])]

### reduceByKey()

In [10]:
from operator import add
myRDD = sc.parallelize([('a', 1), ('a', 2), ('a', 3), ('b', 1)])
newRDD = myRDD.reduceByKey(add)
newRDD.collect()

[('a', 6), ('b', 1)]

### SortByKey

In [12]:
myRDD = sc.parallelize([('c', 1), ('d', 2), ('a', 3), ('b', 4)])
newRDD = myRDD.sortByKey() # type: ignore
newRDD.collect()

[('a', 3), ('b', 4), ('c', 1), ('d', 2)]

### UNION()

In [13]:
myRDD1 = sc.parallelize([1, 2, 3, 4])
myRDD2 = sc.parallelize([3, 4, 5, 6, 7])
newRDD = myRDD1.union(myRDD2)
newRDD.collect()

[1, 2, 3, 4, 3, 4, 5, 6, 7]

## Actions

### count()

In [14]:
data = [1, 1, 1, 2, 3, 4, 5, 6, 7, 8, 8]
myRDD = sc.parallelize(data)
myRDD.count()

11

### reduce()

In [15]:
data = [1, 2, 3, 4, 5]
myRDD = sc.parallelize(data)
myRDD.reduce(lambda x, y: x * y)

120

### foreach()

In [16]:
def fun(x):
    print(x)

data = ['Scala', 'Python', 'Java', 'R']
myRDD = sc.parallelize(data)
myRDD.foreach(fun)

R
Scala
Python
Java


### countByValue()

In [17]:
data = ['Scala', 'Python', 'Java', 'R', 'Scala', 'Python', 'Java', 'R']
myRDD = sc.parallelize(data)
myRDD.countByValue().items()

dict_items([('Scala', 2), ('Python', 2), ('Java', 2), ('R', 2)])

### countByKey()

In [18]:
data = ['Scala', 'Python', 'Java', 'R', 'Scala', 'Python', 'Java', 'R']
myRDD = sc.parallelize(data)
myRDD.countByKey().items() # type: ignore

dict_items([('S', 2), ('P', 2), ('J', 2), ('R', 2)])

### take(n)

In [20]:
data = [2, 5, 3, 8, 4]
myRDD = sc.parallelize(data)
myRDD.take(3)

[2, 5, 3]

### top(n)

In [21]:
data = [2, 5, 3, 8, 4]
myRDD = sc.parallelize(data)
myRDD.top(3) # type: ignore

[8, 5, 4]

In [22]:
spark.stop()

# Aula 5

In [1]:
import scipy
from scipy import stats
import numpy as np

In [2]:
matrix = np.array([[8, 0 , 3, 4, 6], [5, 6, 1, 8, 9], [8, 0, 0, 5, 10]])
novoUser = [8, 0, 2, 3, 0]
nao_assistidos = [0, 1, 0, 0, 1]
print(matrix)
nomeFilmes = ['Round 6', 'A Invocação do Mal', '9 Desconhecidos', 'You', 'La Casa de Papel']

[[ 8  0  3  4  6]
 [ 5  6  1  8  9]
 [ 8  0  0  5 10]]


In [3]:
# criar um vetor com 3 posições e preencher com zero
similarity = [0] * 3
print(matrix[0][0])
print(matrix[1][0])
print(matrix[1, :])
# para cada usuário no sistema
for i in range(0, 3):
    # vamos pegar os dados desse usuário
    temp = matrix[i, :]
    print(i, temp)
    # mas queremos apenas comparar os dados que o novo user assistiu (novoUser != 0)
    tempUser = [t for n, t in zip(novoUser, temp) if n != 0 ]
    tempNovoUser = [n for n in novoUser if n != 0]
    # para verificar o processo
    print('user:', i)
    print(tempUser, tempNovoUser)
    # calcular o pearson
    similarity[i] = stats.pearsonr(tempUser, tempNovoUser)[0] # a função retorna dois valores e o primeiro
print()
print(similarity)

8
5
[5 6 1 8 9]
0 [8 0 3 4 6]
user: 0
[8, 3, 4] [8, 2, 3]
1 [5 6 1 8 9]
user: 1
[5, 1, 8] [8, 2, 3]
2 [ 8  0  0  5 10]
user: 2
[8, 0, 5] [8, 2, 3]

[0.9994237971287664, 0.23621543814299703, 0.8723686098443353]


In [6]:
nota_peso = np.zeros((3, 5))

for nUser in range(3):
    for nFilme in range(5):
        print('nota_peso[',nUser,'][',nFilme,'] = nao_assistidos[', nFilme,'] * matrix[',nUser,'][',nFilme,'] * similarity[',nUser,']')
        print('nota_peso[',nUser,'][',nFilme,'] = ', nao_assistidos[nFilme],' * ',matrix[nUser][nFilme], ' * ', similarity[nUser])
        nota_peso[nUser][nFilme] = nao_assistidos[nFilme] * matrix[nUser][nFilme] * similarity[nUser]
    print(nota_peso)

nota_peso[ 0 ][ 0 ] = nao_assistidos[ 0 ] * matrix[ 0 ][ 0 ] * similarity[ 0 ]
nota_peso[ 0 ][ 0 ] =  0  *  8  *  0.9994237971287664
nota_peso[ 0 ][ 1 ] = nao_assistidos[ 1 ] * matrix[ 0 ][ 1 ] * similarity[ 0 ]
nota_peso[ 0 ][ 1 ] =  1  *  0  *  0.9994237971287664
nota_peso[ 0 ][ 2 ] = nao_assistidos[ 2 ] * matrix[ 0 ][ 2 ] * similarity[ 0 ]
nota_peso[ 0 ][ 2 ] =  0  *  3  *  0.9994237971287664
nota_peso[ 0 ][ 3 ] = nao_assistidos[ 3 ] * matrix[ 0 ][ 3 ] * similarity[ 0 ]
nota_peso[ 0 ][ 3 ] =  0  *  4  *  0.9994237971287664
nota_peso[ 0 ][ 4 ] = nao_assistidos[ 4 ] * matrix[ 0 ][ 4 ] * similarity[ 0 ]
nota_peso[ 0 ][ 4 ] =  1  *  6  *  0.9994237971287664
[[0.         0.         0.         0.         5.99654278]
 [0.         0.         0.         0.         0.        ]
 [0.         0.         0.         0.         0.        ]]
nota_peso[ 1 ][ 0 ] = nao_assistidos[ 0 ] * matrix[ 1 ][ 0 ] * similarity[ 1 ]
nota_peso[ 1 ][ 0 ] =  0  *  5  *  0.23621543814299703
nota_peso[ 1 ][ 1 ] = nao_

In [7]:
notas_acumuladas = np.sum(nota_peso.T, axis=1)
print(notas_acumuladas)

[ 0.          1.41729263  0.          0.         16.84616782]


In [8]:
temp_peso = nota_peso
temp_peso[nota_peso > 0] = 1
print(temp_peso)

temp_similaridade = np.zeros((3, 5))
for nUser in range(3):
    for nFilme in range(5):
        temp_similaridade[nUser][nFilme] = temp_peso[nUser][nFilme] * similarity[nUser]

print(temp_similaridade)
similaridade_acumulada = np.sum(temp_similaridade.T, axis=1)
print(similaridade_acumulada)

[[0. 0. 0. 0. 1.]
 [0. 1. 0. 0. 1.]
 [0. 0. 0. 0. 1.]]
[[0.         0.         0.         0.         0.9994238 ]
 [0.         0.23621544 0.         0.         0.23621544]
 [0.         0.         0.         0.         0.87236861]]
[0.         0.23621544 0.         0.         2.10800785]


In [9]:
nota_final = [0] * 5
# normalização para cada filme com nota acumulada (pela soma dos pesos)
for nFilme in range(5):
    if(similaridade_acumulada[nFilme] > 0):
        nota_final[nFilme] = notas_acumuladas[nFilme] / similaridade_acumulada[nFilme]
    else:
        nota_final[nFilme] = 0
    
print(nota_final)

[0, 6.0, 0, 0, 7.991510972567145]


In [10]:
nAssistidos = sum(nao_assistidos) # type: ignore

notasOrdenadasIndex = sorted(range(len(nota_final)), key=nota_final.__getitem__)[::-1][0:nAssistidos]
print(notasOrdenadasIndex)

for i in notasOrdenadasIndex:
    print(nomeFilmes[i], 'nota: ', nota_final[i])

[4, 1]
La Casa de Papel nota:  7.991510972567145
A Invocação do Mal nota:  6.0


# Aula 6

In [1]:
from pyspark.sql import SparkSession, Row # row formato para o als
from pyspark.ml.evaluation import RegressionEvaluator # para verificar a qualidade do modelo, 
from pyspark.ml.recommendation import ALS # modelo de recomendação (Alternating Least Squares)

In [2]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

23/09/09 11:42:12 WARN Utils: Your hostname, MacBook-Air-de-Marcelo.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
23/09/09 11:42:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/09 11:42:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
lines = spark.read.text('../../../Data/Fase 3/sample_movielens_ratings.txt').rdd

In [4]:
parts = lines.map(lambda row: row.value.split('::'))

In [7]:
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), \
                                    movieId=int(p[1]), \
                                    rating=float(p[2]),  \
                                    timestamp=int(p[3])))

In [8]:
ratings = spark.createDataFrame(ratingsRDD)

                                                                                

In [9]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
|     0|     12|   2.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     17|   1.0|1424380312|
|     0|     19|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
|     0|     23|   1.0|1424380312|
|     0|     26|   3.0|1424380312|
|     0|     27|   1.0|1424380312|
|     0|     28|   1.0|1424380312|
|     0|     29|   1.0|1424380312|
|     0|     30|   1.0|1424380312|
|     0|     31|   1.0|1424380312|
|     0|     34|   1.0|1424380312|
|     0|     37|   1.0|1424380312|
|     0|     41|   2.0|1424380312|
+------+-------+------+----------+
only showing top 20 rows



In [10]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [11]:
als = ALS(maxIter=5, \
        regParam=0.01, \
        userCol='userId', \
        itemCol='movieId', \
        ratingCol='rating', \
        coldStartStrategy='drop')

In [12]:
model = als.fit(training)

23/09/09 12:02:00 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/09/09 12:02:00 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/09/09 12:02:00 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/09/09 12:02:00 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [13]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('Erro Médio Quadrático: ' + str(rmse))



Erro Médio Quadrático: 1.917489553217494


                                                                                

In [14]:
userRec = model.recommendForAllUsers(10)

In [15]:
userRec.show()

                                                                                

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[{25, 6.417806}, ...|
|    26|[{94, 5.4155145},...|
|    27|[{18, 3.9198024},...|
|    12|[{17, 5.1947656},...|
|    22|[{74, 5.106934}, ...|
|     1|[{25, 4.3891563},...|
|    13|[{4, 3.232518}, {...|
|     6|[{25, 4.615814}, ...|
|    16|[{28, 5.4040823},...|
|     3|[{32, 5.0642576},...|
|    20|[{94, 3.6108603},...|
|     5|[{69, 6.417946}, ...|
|    19|[{94, 4.0688896},...|
|    15|[{90, 5.3539877},...|
|    17|[{79, 5.4243345},...|
|     9|[{85, 5.5353346},...|
|     4|[{83, 4.812314}, ...|
|     8|[{53, 5.092467}, ...|
|    23|[{7, 6.6129904}, ...|
|     7|[{69, 5.243858}, ...|
+------+--------------------+
only showing top 20 rows



In [16]:
movieRec = model.recommendForAllItems(10)

In [17]:
movieRec.show()

                                                                                

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     31|[{12, 3.7572966},...|
|     85|[{9, 5.5353346}, ...|
|     65|[{11, 5.1726727},...|
|     53|[{24, 5.590444}, ...|
|     78|[{7, 1.6479115}, ...|
|     34|[{25, 3.1137831},...|
|     81|[{28, 4.544544}, ...|
|     28|[{16, 5.4040823},...|
|     76|[{14, 4.986357}, ...|
|     26|[{15, 2.5443437},...|
|     27|[{23, 5.1857514},...|
|     44|[{5, 4.0061364}, ...|
|     12|[{2, 3.0016155}, ...|
|     91|[{11, 4.946367}, ...|
|     22|[{26, 5.243876}, ...|
|     93|[{2, 5.0935197}, ...|
|     47|[{5, 4.1168113}, ...|
|      1|[{15, 3.960104}, ...|
|     52|[{21, 5.3332176},...|
|     13|[{11, 4.047972}, ...|
+-------+--------------------+
only showing top 20 rows



In [18]:
users = ratings.select(als.getUserCol()).distinct()

In [19]:
users.show()

+------+
|userId|
+------+
|    26|
|    29|
|    19|
|     0|
|    22|
|     7|
|    25|
|     6|
|     9|
|    27|
|    17|
|    28|
|     5|
|     1|
|    10|
|     3|
|    12|
|     8|
|    11|
|     2|
+------+
only showing top 20 rows



In [20]:
UserRecsOnlyItemId = userRec.select(userRec['userId'], userRec['recommendations']['movieId'])

In [22]:
UserRecsOnlyItemId.show(10, False)

                                                                                

+------+----------------------------------------+
|userId|recommendations.movieId                 |
+------+----------------------------------------+
|28    |[25, 92, 81, 9, 68, 69, 89, 49, 48, 10] |
|26    |[94, 51, 22, 23, 24, 68, 75, 28, 74, 46]|
|27    |[18, 4, 40, 88, 2, 83, 75, 55, 41, 63]  |
|12    |[17, 64, 27, 16, 94, 31, 46, 23, 91, 79]|
|22    |[74, 75, 94, 51, 29, 22, 62, 49, 77, 53]|
|1     |[25, 49, 69, 62, 68, 75, 24, 51, 67, 22]|
|13    |[4, 53, 18, 40, 83, 29, 41, 2, 75, 52]  |
|6     |[25, 43, 49, 2, 61, 67, 63, 85, 64, 9]  |
|16    |[28, 51, 54, 90, 85, 29, 81, 5, 96, 94] |
|3     |[32, 30, 69, 53, 27, 55, 25, 18, 75, 88]|
+------+----------------------------------------+
only showing top 10 rows



In [23]:
spark.stop()