<a href="https://colab.research.google.com/github/DevzsJhonny/Colab_notebooks/blob/main/Spark_aula_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [49]:
# passo 1 Instalação do JDK, download dos binários do spark e instalação do Spark 3.1.1
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [50]:
# passo 2 configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [51]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enable", True)
spark

In [52]:
# Baixar conjunto de dados
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

--2024-08-05 23:17:31--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2024-08-05 23:17:31--  https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobcelestine.com (jacobcelestine.com)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobcelestine.com (jacobcelestine.com)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22608 (22K) [text/csv]
Saving to: ‘cars.csv.1’


2024-08-05 23:17:31 (16.7 MB/s) - ‘cars.csv.1’ saved [22608/22608]



In [53]:
# Ler o arquivo CSV e transformar em dataframe
df = spark.read.csv('cars.csv', header=True, inferSchema=True, sep=';')

In [54]:
df.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|  3433|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0|  3449|        10.5|   70|    US|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0|  4341|        10.0|   70|    US|
|    Chevrolet Impala|14.0|        8|       454.0|     220.0|  4354|         9.0|   70|    US|
|   Plymouth Fury iii|14.0|        8|       440.0|

In [55]:
from pyspark.sql.types import *

In [56]:
labels = [
     ('Car',StringType()),
     ('MPG',DoubleType()),
     ('Cylinders',IntegerType()),
     ('Displacement',DoubleType()),
     ('Horsepower',DoubleType()),
     ('Weight',DoubleType()),
     ('Acceleration',DoubleType()),
     ('Model',IntegerType()),
     ('Origin',StringType())
]

In [57]:
schema = StructType([StructField(x[0], x[1], True) for x in labels])
schema

StructType(List(StructField(Car,StringType,true),StructField(MPG,DoubleType,true),StructField(Cylinders,IntegerType,true),StructField(Displacement,DoubleType,true),StructField(Horsepower,DoubleType,true),StructField(Weight,DoubleType,true),StructField(Acceleration,DoubleType,true),StructField(Model,IntegerType,true),StructField(Origin,StringType,true)))

In [58]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: double (nullable = true)
 |-- Weight: decimal(4,0) (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [59]:
df.select(df.Car, df.Cylinders).show()

+--------------------+---------+
|                 Car|Cylinders|
+--------------------+---------+
|Chevrolet Chevell...|        8|
|   Buick Skylark 320|        8|
|  Plymouth Satellite|        8|
|       AMC Rebel SST|        8|
|         Ford Torino|        8|
|    Ford Galaxie 500|        8|
|    Chevrolet Impala|        8|
|   Plymouth Fury iii|        8|
|    Pontiac Catalina|        8|
|  AMC Ambassador DPL|        8|
|Citroen DS-21 Pallas|        4|
|Chevrolet Chevell...|        8|
|    Ford Torino (sw)|        8|
|Plymouth Satellit...|        8|
|  AMC Rebel SST (sw)|        8|
| Dodge Challenger SE|        8|
|  Plymouth 'Cuda 340|        8|
|Ford Mustang Boss...|        8|
|Chevrolet Monte C...|        8|
|Buick Estate Wago...|        8|
+--------------------+---------+
only showing top 20 rows



In [60]:
df.select(df['Car'], df['Cylinders']).show()

+--------------------+---------+
|                 Car|Cylinders|
+--------------------+---------+
|Chevrolet Chevell...|        8|
|   Buick Skylark 320|        8|
|  Plymouth Satellite|        8|
|       AMC Rebel SST|        8|
|         Ford Torino|        8|
|    Ford Galaxie 500|        8|
|    Chevrolet Impala|        8|
|   Plymouth Fury iii|        8|
|    Pontiac Catalina|        8|
|  AMC Ambassador DPL|        8|
|Citroen DS-21 Pallas|        4|
|Chevrolet Chevell...|        8|
|    Ford Torino (sw)|        8|
|Plymouth Satellit...|        8|
|  AMC Rebel SST (sw)|        8|
| Dodge Challenger SE|        8|
|  Plymouth 'Cuda 340|        8|
|Ford Mustang Boss...|        8|
|Chevrolet Monte C...|        8|
|Buick Estate Wago...|        8|
+--------------------+---------+
only showing top 20 rows



In [61]:
from pyspark.sql.functions import col
df.select(col('Car'), col('Cylinders')).show(5)

+--------------------+---------+
|                 Car|Cylinders|
+--------------------+---------+
|Chevrolet Chevell...|        8|
|   Buick Skylark 320|        8|
|  Plymouth Satellite|        8|
|       AMC Rebel SST|        8|
|         Ford Torino|        8|
+--------------------+---------+
only showing top 5 rows



In [62]:
from pyspark.sql.functions import  concat, lit

In [63]:
df.withColumn('Car_Model', concat(df.Car, lit(' '), df.Model)).show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|           Car_Model|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|Chevrolet Chevell...|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|Buick Skylark 320 70|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|Plymouth Satellit...|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|  3433|        12.0|   70|    US|    AMC Rebel SST 70|
|         Ford Torino|17.0|        8|       302.0|     140.0|  3449|        10.5|   70|    US|      Ford Torino 70|
+--------------------+----+---------+------------+----------+------+----

In [64]:
df.groupBy('Origin').count().show()

+------+-----+
|Origin|count|
+------+-----+
|Europe|   73|
|    US|  254|
| Japan|   79|
+------+-----+



In [65]:
df.groupBy('Origin','Car').count().show()

+------+--------------------+-----+
|Origin|                 Car|count|
+------+--------------------+-----+
|    US|          Ford Pinto|    6|
| Japan|      Datsun B210 GX|    1|
|    US|     Dodge Magnum XE|    1|
|    US|         Ford Torino|    1|
|Europe|           Opel 1900|    2|
|    US|     Mercury Marquis|    1|
| Japan|              Subaru|    2|
|    US|           Ford F108|    1|
|Europe| Volkswagen Rabbit l|    1|
|    US|            Ford LTD|    2|
| Japan|          Datsun 710|    2|
| Japan|Toyota Corolla Li...|    2|
|    US|Chevrolet Cavalie...|    1|
|Europe|            Fiat 131|    1|
|    US|       Chevrolet C10|    1|
|Europe|         Peugeot 504|    4|
|    US|Dodge Aries Wagon...|    1|
|    US|     Mercury Monarch|    1|
|    US|Plymouth Volare P...|    1|
|    US|      Dodge Colt m/m|    1|
+------+--------------------+-----+
only showing top 20 rows



In [66]:
df.groupBy('Origin','Car').sum().show()

+------+--------------------+--------+--------------+-----------------+---------------+-----------+-----------------+----------+
|Origin|                 Car|sum(MPG)|sum(Cylinders)|sum(Displacement)|sum(Horsepower)|sum(Weight)|sum(Acceleration)|sum(Model)|
+------+--------------------+--------+--------------+-----------------+---------------+-----------+-----------------+----------+
|    US|          Ford Pinto|   137.5|            26|            793.0|          417.0|      14995|             99.1|       444|
| Japan|      Datsun B210 GX|    39.4|             4|             85.0|           70.0|       2070|             18.6|        78|
|    US|     Dodge Magnum XE|    17.5|             8|            318.0|          140.0|       4080|             13.7|        78|
|    US|         Ford Torino|    17.0|             8|            302.0|          140.0|       3449|             10.5|        70|
|Europe|           Opel 1900|    53.0|             8|            232.0|          171.0|       434

In [67]:
from pyspark.sql.functions import min, max

In [68]:
df.select(min(col('Weight')), max('Weight')).show()

+-----------+-----------+
|min(Weight)|max(Weight)|
+-----------+-----------+
|       1613|       5140|
+-----------+-----------+



In [69]:
df.createOrReplaceGlobalTempView('vw_cars')

In [70]:
spark.sql('SELECT * FROM global_temp.vw_cars').show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|  3433|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0|  3449|        10.5|   70|    US|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0|  4341|        10.0|   70|    US|
|    Chevrolet Impala|14.0|        8|       454.0|     220.0|  4354|         9.0|   70|    US|
|   Plymouth Fury iii|14.0|        8|       440.0|

In [71]:
spark.sql('SELECT count(*) as total FROM global_temp.vw_cars').show()

+-----+
|total|
+-----+
|  406|
+-----+



In [72]:
# MLlib machine learning com spark
# Vector Assemble - Transforma multiplas colunas em uma coluna tipo vetor

In [73]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [74]:
assembler = VectorAssembler(inputCols=['Cylinders', 'Displacement', 'Horsepower', 'Model', 'Acceleration', 'Weight'],
                            outputCol='Features')
output = assembler.transform(df)

In [75]:
final_data = output.select('Features', 'MPG')
final_data.show(4)

+--------------------+----+
|            Features| MPG|
+--------------------+----+
|[8.0,307.0,130.0,...|18.0|
|[8.0,350.0,165.0,...|15.0|
|[8.0,318.0,150.0,...|18.0|
|[8.0,304.0,150.0,...|16.0|
+--------------------+----+
only showing top 4 rows



In [76]:
train_data, test_data = final_data.randomSplit([0.8, 0.2]) # 80% dos dados para treino e 20% para teste

In [77]:
model_reg = LinearRegression(featuresCol='Features', labelCol='MPG')

In [78]:
model_reg = model_reg.fit(train_data)

In [79]:
pred = model_reg.evaluate(test_data)

In [80]:
pred.predictions.show()

+--------------------+----+------------------+
|            Features| MPG|        prediction|
+--------------------+----+------------------+
|[3.0,70.0,97.0,72...|19.0| 23.15703018239924|
|[4.0,83.0,61.0,74...|32.0| 27.88232447521679|
|[4.0,85.0,65.0,79...|31.8| 32.26807305030341|
|[4.0,85.0,70.0,78...|39.4| 30.96155013849161|
|[4.0,89.0,60.0,80...|38.1| 33.48599865640864|
|[4.0,89.0,62.0,80...|29.8| 33.91010609816719|
|[4.0,90.0,48.0,80...|44.3|33.014312520341726|
|[4.0,91.0,60.0,78...|36.1|32.585839793825315|
|[4.0,91.0,67.0,80...|44.6| 33.70844056856218|
|[4.0,91.0,68.0,81...|34.1|33.935438202219586|
|[4.0,97.0,78.0,77...|30.5|28.751489074310932|
|[4.0,97.0,78.0,80...|34.3|31.654686932637418|
|[4.0,97.0,88.0,70...|27.0|22.996701029870188|
|[4.0,97.0,88.0,72...|27.0|25.241102774811715|
|[4.0,98.0,65.0,81...|29.9|31.800009151654635|
|[4.0,98.0,68.0,78...|30.0|30.174759355450888|
|[4.0,98.0,80.0,72...|28.0|24.595691945744406|
|[4.0,105.0,63.0,8...|34.7| 32.26101989743589|
|[4.0,105.0,6

In [81]:
from pyspark.ml.evaluation import RegressionEvaluator

In [82]:
eval = RegressionEvaluator(predictionCol='prediction', labelCol='MPG', metricName='rmse')

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.2f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.2f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.2f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.2f" %r2)

RMSE: 4.00
MSE: 16.01
MAE: 3.21
r2: 0.77


In [85]:
# Classificação

In [83]:
from sklearn.datasets import load_iris
import numpy as np
import pandas as pd
from pyspark.ml.feature import StringIndexer

In [86]:
data = load_iris()
data

{'data': array([[5.1, 3.5, 1.4, 0.2],
        [4.9, 3. , 1.4, 0.2],
        [4.7, 3.2, 1.3, 0.2],
        [4.6, 3.1, 1.5, 0.2],
        [5. , 3.6, 1.4, 0.2],
        [5.4, 3.9, 1.7, 0.4],
        [4.6, 3.4, 1.4, 0.3],
        [5. , 3.4, 1.5, 0.2],
        [4.4, 2.9, 1.4, 0.2],
        [4.9, 3.1, 1.5, 0.1],
        [5.4, 3.7, 1.5, 0.2],
        [4.8, 3.4, 1.6, 0.2],
        [4.8, 3. , 1.4, 0.1],
        [4.3, 3. , 1.1, 0.1],
        [5.8, 4. , 1.2, 0.2],
        [5.7, 4.4, 1.5, 0.4],
        [5.4, 3.9, 1.3, 0.4],
        [5.1, 3.5, 1.4, 0.3],
        [5.7, 3.8, 1.7, 0.3],
        [5.1, 3.8, 1.5, 0.3],
        [5.4, 3.4, 1.7, 0.2],
        [5.1, 3.7, 1.5, 0.4],
        [4.6, 3.6, 1. , 0.2],
        [5.1, 3.3, 1.7, 0.5],
        [4.8, 3.4, 1.9, 0.2],
        [5. , 3. , 1.6, 0.2],
        [5. , 3.4, 1.6, 0.4],
        [5.2, 3.5, 1.5, 0.2],
        [5.2, 3.4, 1.4, 0.2],
        [4.7, 3.2, 1.6, 0.2],
        [4.8, 3.1, 1.6, 0.2],
        [5.4, 3.4, 1.5, 0.4],
        [5.2, 4.1, 1.5, 0.1],
  

In [88]:
cols = [i.replace('(cm)','').strip().replace(' ','_') for i in data.feature_names] + ['label']

iris_df = pd.DataFrame(np.c_[data.data, data.target], columns=cols)
iris_df.iteritems = iris_df.items

In [89]:
iris_df.head(10)

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,label
0,5.1,3.5,1.4,0.2,0.0
1,4.9,3.0,1.4,0.2,0.0
2,4.7,3.2,1.3,0.2,0.0
3,4.6,3.1,1.5,0.2,0.0
4,5.0,3.6,1.4,0.2,0.0
5,5.4,3.9,1.7,0.4,0.0
6,4.6,3.4,1.4,0.3,0.0
7,5.0,3.4,1.5,0.2,0.0
8,4.4,2.9,1.4,0.2,0.0
9,4.9,3.1,1.5,0.1,0.0


In [91]:
df = spark.createDataFrame(iris_df)
df.show(5)

+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|label|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|  0.0|
|         4.9|        3.0|         1.4|        0.2|  0.0|
|         4.7|        3.2|         1.3|        0.2|  0.0|
|         4.6|        3.1|         1.5|        0.2|  0.0|
|         5.0|        3.6|         1.4|        0.2|  0.0|
+------------+-----------+------------+-----------+-----+
only showing top 5 rows



In [92]:
vectorAssembler = VectorAssembler(inputCols=['sepal_length',
                                             'sepal_width',
                                             'petal_length',
                                             'petal_width'],
                                  outputCol='features')

In [97]:
df_transform = vectorAssembler.transform(df)
df_final = df_transform.select(['features', 'label'])
df_final.show(5)

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  0.0|
|[4.9,3.0,1.4,0.2]|  0.0|
|[4.7,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
|[5.0,3.6,1.4,0.2]|  0.0|
+-----------------+-----+
only showing top 5 rows



In [94]:
from pyspark.ml.classification import NaiveBayes, MultilayerPerceptronClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [98]:
# separando treino e teste
split = df_transform.randomSplit([0.8, 0.2])
train = split[0]
test = split[1]

In [99]:
layers = [4,5,8,12]
mlp_model = MultilayerPerceptronClassifier(layers=layers, seed=10)
mlp_model = mlp_model.fit(train)

In [104]:
mlp_pred = mlp_model.transform(test)
mlp_eval = MulticlassClassificationEvaluator(metricName='accuracy')
mlp_eval.evaluate(mlp_pred)

0.59375

In [105]:
# Processamento de Linguagem Natural (NLP)

In [106]:
from pyspark.ml.feature import Tokenizer

In [107]:
sentencesDF = spark.createDataFrame([
                                    (1, 'Introdução ao spark mllib'),
                                    (2, 'mllib contem funções usadas para modelos de classificação e regressão'),
                                    (1, 'e podemos criar pipelines de dados e de machine learning com spark')],
                                     ['id', 'sentence'])

In [108]:
sentencesDF.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|Introdução ao spa...|
|  2|mllib contem funç...|
|  1|e podemos criar p...|
+---+--------------------+



In [109]:
sent_token = Tokenizer(inputCol='sentence', outputCol='words')
sent_tokenized_df = sent_token.transform(sentencesDF)
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|Introdução ao spa...|[introdução, ao, ...|
|  2|mllib contem funç...|[mllib, contem, f...|
|  1|e podemos criar p...|[e, podemos, cria...|
+---+--------------------+--------------------+



In [110]:
sent_tokenized_df.show(truncate=False)

+---+---------------------------------------------------------------------+--------------------------------------------------------------------------------+
|id |sentence                                                             |words                                                                           |
+---+---------------------------------------------------------------------+--------------------------------------------------------------------------------+
|1  |Introdução ao spark mllib                                            |[introdução, ao, spark, mllib]                                                  |
|2  |mllib contem funções usadas para modelos de classificação e regressão|[mllib, contem, funções, usadas, para, modelos, de, classificação, e, regressão]|
|1  |e podemos criar pipelines de dados e de machine learning com spark   |[e, podemos, criar, pipelines, de, dados, e, de, machine, learning, com, spark] |
+---+-----------------------------------------------------

In [111]:
# TF-IDF
from pyspark.ml.feature import HashingTF, IDF

In [112]:
sentencesDF.take(1)

[Row(id=1, sentence='Introdução ao spark mllib')]

In [113]:
hashingTF = HashingTF(inputCol='words', outputCol = 'rawFeatures', numFeatures=20)

In [114]:
sentHFTFDF = hashingTF.transform(sent_tokenized_df)

In [115]:
sentHFTFDF.take(1)

[Row(id=1, sentence='Introdução ao spark mllib', words=['introdução', 'ao', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 1.0, 14: 1.0, 15: 2.0}))]