# Componente de machine learning do Spark MLlib
* Acesso a dados
* Regressão, classificação e agrupamento com Spark

In [None]:
from pyspark.ml.regression import LinearRegression

from pyspark.sql import SparkSession

# ponto de entrada - sessão spark

In [25]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# exemplo de operação com dataframe

In [70]:
df = spark.read.json("pessoas.json")
df.show()

+-----+------+
|idade|  nome|
+-----+------+
| null|Silvio|
|   30| teste|
|   19|teste2|
|   13|teste3|
+-----+------+



# Algumas operações com dataframe

In [71]:
# Print schema
df.printSchema()

# selecione apenas a coluna nome
df.select("nome").show()

# seleciona todo mundo e adiciona 1 a idade
df.select(df['nome'], df['idade'] + 1).show()

# filtra apenas maiores de 21
df.filter(df['idade'] > 21).show()

# Conta pessoas por idade
df.groupBy("idade").count().show()

root
 |-- idade: long (nullable = true)
 |-- nome: string (nullable = true)

+------+
|  nome|
+------+
|Silvio|
| teste|
|teste2|
|teste3|
+------+

+------+-----------+
|  nome|(idade + 1)|
+------+-----------+
|Silvio|       null|
| teste|         31|
|teste2|         20|
|teste3|         14|
+------+-----------+

+-----+-----+
|idade| nome|
+-----+-----+
|   30|teste|
+-----+-----+

+-----+-----+
|idade|count|
+-----+-----+
|   19|    1|
| null|    1|
|   13|    1|
|   30|    1|
+-----+-----+



# regressao linear com PySpark
* Preparação de dados

In [58]:
#carregando CSV
training = spark.read.load("Auto2.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

training.show()

+----+---------+------------+----------+------+------------+----+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|                name|
+----+---------+------------+----------+------+------------+----+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|  70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|  70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|  70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|  70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|  70|     1|         ford torino|
|15.0|        8|       429.0|       198|  4341|        10.0|  70|     1|    ford galaxie 500|
|14.0|        8|       454.0|       220|  4354|         9.0|  70|     1|    chevrolet impala|
|14.0|        8|       440.0|       215|  4312|         8.5|

# regressao linear com PySpark
* Indexando para coluna categórica

In [72]:
from pyspark.ml.feature import StringIndexer

indexer=StringIndexer(inputCol='origin',outputCol='origin_cat')
indexed=indexer.fit(training).transform(training)
indexed.show()

+----+---------+------------+----------+------+------------+----+------+--------------------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|                name|origin_cat|
+----+---------+------------+----------+------+------------+----+------+--------------------+----------+
|18.0|        8|       307.0|       130|  3504|        12.0|  70|     1|chevrolet chevell...|       0.0|
|15.0|        8|       350.0|       165|  3693|        11.5|  70|     1|   buick skylark 320|       0.0|
|18.0|        8|       318.0|       150|  3436|        11.0|  70|     1|  plymouth satellite|       0.0|
|16.0|        8|       304.0|       150|  3433|        12.0|  70|     1|       amc rebel sst|       0.0|
|17.0|        8|       302.0|       140|  3449|        10.5|  70|     1|         ford torino|       0.0|
|15.0|        8|       429.0|       198|  4341|        10.0|  70|     1|    ford galaxie 500|       0.0|
|14.0|        8|       454.0|       220|  4354|        

# regressao linear com PySpark
* Separando entre features e target

In [62]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

#input cols identifica features
assembler=VectorAssembler(inputCols=['cylinders','displacement','horsepower','weight','acceleration','year','origin_cat']
                          ,outputCol='features')

output=assembler.transform(indexed)
output.select('features','mpg').show(5)

+--------------------+----+
|            features| mpg|
+--------------------+----+
|[8.0,307.0,130.0,...|18.0|
|[8.0,350.0,165.0,...|15.0|
|[8.0,318.0,150.0,...|18.0|
|[8.0,304.0,150.0,...|16.0|
|[8.0,302.0,140.0,...|17.0|
+--------------------+----+
only showing top 5 rows



# regressao linear com PySpark
* Separa em treino e teste

In [64]:

final_data=output.select('features','mpg')
train_data,test_data=final_data.randomSplit([0.7,0.3])
train_data.describe().show()

+-------+-----------------+
|summary|              mpg|
+-------+-----------------+
|  count|              274|
|   mean|23.67956204379561|
| stddev|7.725175424456545|
|    min|              9.0|
|    max|             46.6|
+-------+-----------------+



# regressao linear com PySpark
* Executa o modelo

In [82]:

from pyspark.ml.regression import LinearRegression

lr=LinearRegression(featuresCol='features',labelCol='mpg')

model=lr.fit(train_data)

results=model.evaluate(train_data)
  
print('Rsquared :',results.r2)

IllegalArgumentException: mpg does not exist. Available: features, Class

In [76]:
unlabeled_data=test_data.select('features')
unlabeled_data.show(5)

+--------------------+
|            features|
+--------------------+
|[3.0,70.0,90.0,21...|
|[3.0,70.0,97.0,23...|
|[4.0,71.0,65.0,17...|
|[4.0,79.0,58.0,18...|
|[4.0,79.0,67.0,19...|
+--------------------+
only showing top 5 rows



In [77]:
predictions=model.transform(unlabeled_data)
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[3.0,70.0,90.0,21...| 25.78089340728376|
|[3.0,70.0,97.0,23...|23.600437132038923|
|[4.0,71.0,65.0,17...| 27.58818941678378|
|[4.0,79.0,58.0,18...| 33.58813799313259|
|[4.0,79.0,67.0,19...| 29.68429055520404|
|[4.0,85.0,65.0,21...|32.584337768452656|
|[4.0,86.0,64.0,18...| 32.74858968574691|
|[4.0,86.0,65.0,19...| 31.92343594705451|
|[4.0,86.0,65.0,20...| 32.63943820615337|
|[4.0,88.0,76.0,20...|26.595923166374956|
|[4.0,90.0,75.0,21...|  28.8561723018591|
|[4.0,91.0,67.0,18...| 33.28960735033425|
|[4.0,91.0,67.0,19...| 34.31015068793157|
|[4.0,91.0,67.0,19...| 34.44825640843146|
|[4.0,91.0,67.0,19...| 34.35278277732755|
|[4.0,97.0,54.0,22...| 28.34332578101012|
|[4.0,97.0,67.0,20...| 33.56369068343143|
|[4.0,97.0,78.0,19...| 32.13286850965254|
|[4.0,98.0,63.0,20...|28.908510765843335|
|[4.0,98.0,66.0,18...| 30.74784775898139|
+--------------------+------------

# classificação com pyspark

In [78]:
#carregando CSV
diab = spark.read.load("pima-indians-diabetes.csv",
                     format="csv", sep=",", inferSchema="true", header="true")

diab.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-----+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Class|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-----+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|    1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|    0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|    1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|    0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|    1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|    0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|    1|
|         10|    115

In [80]:
assembler=VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age']
                          ,outputCol='features')

output=assembler.transform(diab)
output.select('features','Class').show(5)

+--------------------+-----+
|            features|Class|
+--------------------+-----+
|[6.0,148.0,72.0,3...|    1|
|[1.0,85.0,66.0,29...|    0|
|[8.0,183.0,64.0,0...|    1|
|[1.0,89.0,66.0,23...|    0|
|[0.0,137.0,40.0,3...|    1|
+--------------------+-----+
only showing top 5 rows



In [81]:
final_data=output.select('features','Class')
train_data,test_data=final_data.randomSplit([0.7,0.3])
train_data.describe().show()

+-------+-------------------+
|summary|              Class|
+-------+-------------------+
|  count|                545|
|   mean| 0.3596330275229358|
| stddev|0.48033368955231787|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [83]:
from pyspark.ml.classification import LogisticRegression

logr = LogisticRegression(featuresCol='features', labelCol='Class')

model=logr.fit(train_data)

results=model.evaluate(train_data)
  
#print('Rsquared :',results.r2)

In [88]:
predictions = model.transform(test_data)
predictions.select( 'Class', 'rawPrediction', 'prediction', 'probability').show(50)

+-----+--------------------+----------+--------------------+
|Class|       rawPrediction|prediction|         probability|
+-----+--------------------+----------+--------------------+
|    0|[5.35945491440936...|       0.0|[0.99531854985432...|
|    0|[4.87422172956509...|       0.0|[0.99241690402990...|
|    0|[3.56800453978200...|       0.0|[0.97256199004675...|
|    0|[3.24942948595821...|       0.0|[0.96265260662548...|
|    0|[2.19341948050555...|       0.0|[0.89965701966972...|
|    0|[1.83141410404642...|       0.0|[0.86193010069208...|
|    0|[2.21230456961086...|       0.0|[0.90134903635674...|
|    0|[0.94492671843620...|       0.0|[0.72009375649918...|
|    0|[1.50364478024666...|       0.0|[0.81811745314497...|
|    0|[1.31786920128372...|       0.0|[0.78882697869835...|
|    1|[2.22498336256877...|       0.0|[0.90247069858662...|
|    0|[3.23431701449847...|       0.0|[0.96210545936005...|
|    0|[1.92416277627702...|       0.0|[0.87260191862706...|
|    0|[2.30500504579567

# Kmeans Spark

In [None]:
import numpy as np

import matplotlib.pyplot as plt

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

import datetime

spark = SparkSession.builder.appName('SparkKmeans').getOrCreate()

df2 = spark.read.load("/home/silvio/dataset/minute_weather.csv",
                     format="csv", sep=",", inferSchema="true", header="true")
                     
df = df2.drop("rowID","hpwren_timestamp")

df = df.fillna(0)

B=datetime.datetime.now()

cost = []
vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
vector_df = vecAssembler.transform(df)

print('teste silhoute')    

K = range(2,15)
for k in K:
    #kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol('features')
    #model = kmeans.fit(vector_df)
    kmeans = KMeans().setK(k).setSeed(1)
    model = kmeans.fit(vector_df )
    cost.append(model.summary.trainingCost)

E=datetime.datetime.now()
print(E-B)
print(cost)

In [None]:
import numpy as np

import matplotlib.pyplot as plt

from sklearn.cluster import KMeans as KM
from sklearn.metrics import silhouette_score

#from pyspark.ml.clustering import KMeans
#from pyspark.ml.evaluation import ClusteringEvaluator
#from pyspark.ml.feature import VectorAssembler
#from pyspark.sql import SparkSession

import pandas as pd
import datetime

#spark = SparkSession.builder.appName('SparkKmeans').getOrCreate()

df2 = pd.read_csv("/home/silvio/dataset/minute_weather.csv")
                     
df = df2.drop("rowID","hpwren_timestamp")

df = df.fillna(0)

B=datetime.datetime.now()

cost = []
#vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features")
#vector_df = vecAssembler.transform(df)
    
K = range(2,15)
for k in K:
    #kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol('features')
    model = kmeans.fit(df)
    #kmeans = KMeans().setK(k).setSeed(1)
    #model = kmeans.fit(vector_df )
    #cost.append(model.summary.trainingCost)

E=datetime.datetime.now()
print(E-B)
print(cost)

In [None]:
from sklearn import metrics
from scipy.spatial.distance import cdist
from sklearn.metrics import silhouette_score

df2 = pd.read_csv("/home/silvio/dataset/minute_weather.csv")
                     
df = df2.drop("rowID","hpwren_timestamp")

df = df.fillna(0)

print('teste')
# k means determine k
silhouette = []
K = range(2,16)
for k in K:
    print(k)
    kmeanModel = KMeans(n_clusters=k).fit(df)
    kmeanModel.fit(df)
    silhouette.append(silhouette_score(df, kmeanModel.labels_))
    print(k)
    
# Plot the elbow
#plt.plot(K, distortions, 'bx-')
plt.plot(K, silhouette, 'bx-')
plt.xlabel('k')
plt.ylabel('Distortion')
plt.title('The Elbow Method showing the optimal k')