In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
import pandas as pd

In [0]:
spark

In [0]:
# Carregamento de dados
file_path = "dbfs:/FileStore/shared_uploads/lucasldfm@gmail.com/street_light_energy_usage_1-1.csv"
# Criação do dataframe
df_spark = (spark.read.csv(file_path, sep =",",inferSchema = True, header = True))



In [0]:
# Criação de sample
df_sample = df_spark.sample(0.008)

In [0]:
# Verificar quantidade de dados do dataframe
df_spark.count()

Out[2]: 1047405

In [0]:
# Verificar quantidade de dados do sample
df_sample.count()

Out[19]: 8354

In [0]:
# Verificar se o DataFrame possui dados nulos
colunas_com_nulos = []
for coluna in df_sample.columns:
    nulos = df_sample.filter(col(coluna).isNull()).count()
    if nulos > 0:
        colunas_com_nulos.append(coluna)


In [0]:
# Exibir colunas com dados nulos, se houver
if colunas_com_nulos:
    print("Colunas com dados nulos:", colunas_com_nulos)
else:
    print("O DataFrame não possui dados nulos.")

Colunas com dados nulos: ['Start Time', 'End Time', 'Cost']


In [0]:
#Print do schema do dataframe
df_spark.printSchema()

root
 |-- index: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Loc: string (nullable = true)
 |-- Account #: long (nullable = true)
 |-- Customer Code: integer (nullable = true)
 |-- Premise Code: integer (nullable = true)
 |-- Meter ID: string (nullable = true)
 |-- Service Type: string (nullable = true)
 |-- Channel Number: integer (nullable = true)
 |-- Power Flow: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Interval Length: string (nullable = true)
 |-- Time Zone: string (nullable = true)
 |-- Start Date Time: string (nullable = true)
 |-- End Date Time: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Time: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Time: string (nullable = true)
 |-- Time of Use: string (nullable = true)
 |-- Read Type: string (nullable = true)
 |-- Usage: double (nullable = true)
 |-- Day of the Week: string (nullable = true)
 |-- Hour of the Day: integer (null

In [0]:
#Print do schema do sample
df_sample.printSchema()

root
 |-- index: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Loc: string (nullable = true)
 |-- Account #: long (nullable = true)
 |-- Customer Code: integer (nullable = true)
 |-- Premise Code: integer (nullable = true)
 |-- Meter ID: string (nullable = true)
 |-- Service Type: string (nullable = true)
 |-- Channel Number: integer (nullable = true)
 |-- Power Flow: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Interval Length: string (nullable = true)
 |-- Time Zone: string (nullable = true)
 |-- Start Date Time: string (nullable = true)
 |-- End Date Time: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Time: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Time: string (nullable = true)
 |-- Time of Use: string (nullable = true)
 |-- Read Type: string (nullable = true)
 |-- Usage: double (nullable = true)
 |-- Day of the Week: string (nullable = true)
 |-- Hour of the Day: integer (null

In [0]:
df_spark.summary().show()

+-------+-----------------+------------------+--------------------+--------------------+-------------+------------------+-----------+------------+--------------+----------+-------+---------------+---------+--------------------+--------------------+--------------------+----------+--------------------+--------+-----------+----------+-------------------+---------------+------------------+------+
|summary|            index|              Name|                 Loc|           Account #|Customer Code|      Premise Code|   Meter ID|Service Type|Channel Number|Power Flow|   Unit|Interval Length|Time Zone|     Start Date Time|       End Date Time|          Start Date|Start Time|            End Date|End Time|Time of Use| Read Type|              Usage|Day of the Week|   Hour of the Day|  Cost|
+-------+-----------------+------------------+--------------------+--------------------+-------------+------------------+-----------+------------+--------------+----------+-------+---------------+---------+--

In [0]:
df_sample.describe().show()

+-------+------------------+------------------+--------------------+--------------------+-------------+------------------+-----------+------------+--------------+----------+-------+---------------+---------+--------------------+--------------------+--------------------+----------+--------------------+--------+-----------+----------+-------------------+---------------+------------------+-----+
|summary|             index|              Name|                 Loc|           Account #|Customer Code|      Premise Code|   Meter ID|Service Type|Channel Number|Power Flow|   Unit|Interval Length|Time Zone|     Start Date Time|       End Date Time|          Start Date|Start Time|            End Date|End Time|Time of Use| Read Type|              Usage|Day of the Week|   Hour of the Day| Cost|
+-------+------------------+------------------+--------------------+--------------------+-------------+------------------+-----------+------------+--------------+----------+-------+---------------+---------+-

In [0]:
# Tratamento: Separando a coluna Start Date em Day, Month e Year
df_spark = df_spark.withColumn("Day", df_spark["Start Date"].substr(0,2).cast("integer"))\
            .withColumn("Month", df_spark["Start Date"].substr(4,2).cast("integer"))\
            .withColumn("Year", df_spark["Start Date"].substr(7,4).cast("integer"))

In [0]:
# Tratamento do sample: Separando a coluna Start Date em Day, Month e Year
df_sample = df_sample.withColumn("Day", df_sample["Start Date"].substr(0,2).cast("integer"))\
            .withColumn("Month", df_sample["Start Date"].substr(4,2).cast("integer"))\
            .withColumn("Year", df_sample["Start Date"].substr(7,4).cast("integer"))

In [0]:
# Verificar os valores distintos da coluna Read Type
df_spark.select('Read Type').distinct().collect()

Out[26]: [Row(Read Type='Actual'), Row(Read Type='Calculated')]

In [0]:
# Tratamento: Selecionar colunas que serão mantidas

columns_to_keep = [
    "Name",
    "Account #",
    "Premise Code", 
    "Meter ID",
    "Read Type",
    "Hour of the Day",    
    "Usage",
    "Day of the Week",
    "Day",
    "Month",
    "Year",
]
#Selecionar colunas que continuarão no dataframe total
df_spark_tratamento = df_spark.select(columns_to_keep)

# Print do dataframe para verificação após o tratamento
display(df_spark_tratamento)

Name,Account #,Premise Code,Meter ID,Read Type,Hour of the Day,Usage,Day of the Week,Day,Month,Year
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.926,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.927,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.925,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.926,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,1,0.927,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,1,0.925,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,1,0.925,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,1,0.927,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,2,0.927,Wed,10,8,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,2,0.927,Wed,10,8,2014


In [0]:
#Selecionar colunas que continuarão no dataframe sample
df_sample_tratamento = df_sample.select(columns_to_keep)

display(df_sample_tratamento)

Name,Account #,Premise Code,Meter ID,Read Type,Hour of the Day,Usage,Day of the Week,Day,Month,Year
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,22,0.932,Mon,10,13,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,15,0.0,Wed,10,15,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,7,0.0,Thu,10,16,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,3,0.93,Sun,10,19,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,9,0.0,Mon,10,20,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,15,0.0,Mon,10,20,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,2,0.933,Wed,10,22,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,5,0.939,Thu,10,23,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.934,Sat,10,25,2014
CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,18,0.94,Sun,10,26,2014


In [0]:
#Transformar de pyspark para pandas
pandasDF = df_spark_tratamento.toPandas()

In [0]:
#Transformar de pyspark para pandas
pandas_sample = df_sample_tratamento.toPandas()

In [0]:
pandasDF

Unnamed: 0,Name,Account #,Premise Code,Meter ID,Read Type,Hour of the Day,Usage,Day of the Week,Day,Month,Year
0,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.926,Wed,10,8,2014
1,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.927,Wed,10,8,2014
2,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.925,Wed,10,8,2014
3,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,0,0.926,Wed,10,8,2014
4,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,1,0.927,Wed,10,8,2014
...,...,...,...,...,...,...,...,...,...,...,...
1047400,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,22,0.258,Wed,10,12,2016
1047401,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,23,0.257,Wed,10,12,2016
1047402,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,23,0.258,Wed,10,12,2016
1047403,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,23,0.258,Wed,10,12,2016


In [0]:
# Verificação de integridade dos dados 
pandas_sample.count()

Out[30]: Name               8354
Account #          8354
Premise Code       8354
Meter ID           8354
Read Type          8354
Hour of the Day    8354
Usage              8354
Day of the Week    8354
Day                8354
Month              8354
Year               8354
dtype: int64

In [0]:
# Verificação de integridade dos dados 
pandas_sample

Unnamed: 0,Name,Account #,Premise Code,Meter ID,Read Type,Hour of the Day,Usage,Day of the Week,Day,Month,Year
0,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,22,0.932,Mon,10,13,2014
1,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,15,0.000,Wed,10,15,2014
2,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,7,0.000,Thu,10,16,2014
3,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,3,0.930,Sun,10,19,2014
4,CITY OF LAS VEGAS,3000100090110047880,1004788,CC029951602,Actual,9,0.000,Mon,10,20,2014
...,...,...,...,...,...,...,...,...,...,...,...
8349,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,0,0.258,Tue,10,11,2016
8350,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,14,0.000,Tue,10,11,2016
8351,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,21,0.258,Tue,10,11,2016
8352,CITY OF LAS VEGAS,3000100090115545359,1554535,CC029790952,Actual,2,0.258,Wed,10,12,2016


In [0]:
# Criação do Schema definido pelo time

from pyspark.sql.types import LongType, StringType, StructType, StructField, IntegerType, TimestampType, FloatType

userDefinedSchema = StructType(
  [
  StructField("Name",StringType()),
  StructField("Account #", LongType()),
  StructField("Premise Code", LongType()),
  StructField("Meter ID", StringType()),
  StructField("Read Type", StringType()),
  StructField("Hour of the Day", IntegerType()),
  StructField("Usage", FloatType()),
  StructField("Day of the Week", StringType()), 
  StructField("Day", IntegerType()),
  StructField("Month", IntegerType()),
  StructField("Year", IntegerType()),
  ]
)

In [0]:
# Criação do dataframe criado com o schema definido pelo time
sparkDF_tratamento = spark.createDataFrame(pandasDF,schema=userDefinedSchema)
sparkDF_tratamento.printSchema()


root
 |-- Name: string (nullable = true)
 |-- Account #: long (nullable = true)
 |-- Premise Code: long (nullable = true)
 |-- Meter ID: string (nullable = true)
 |-- Read Type: string (nullable = true)
 |-- Hour of the Day: integer (nullable = true)
 |-- Usage: float (nullable = true)
 |-- Day of the Week: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)



In [0]:
# Criação do dataframe criado com o schema definido pelo time
sparkDF_sample_tratamento = spark.createDataFrame(pandas_sample,schema=userDefinedSchema)
sparkDF_sample_tratamento.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Account #: long (nullable = true)
 |-- Premise Code: long (nullable = true)
 |-- Meter ID: string (nullable = true)
 |-- Read Type: string (nullable = true)
 |-- Hour of the Day: integer (nullable = true)
 |-- Usage: float (nullable = true)
 |-- Day of the Week: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)



In [0]:
# Verificação dos dados
sparkDF_tratamento.show()

+------------------+-------------------+------------+-----------+---------+---------------+-----+---------------+---+-----+----+
|              Name|          Account #|Premise Code|   Meter ID|Read Type|Hour of the Day|Usage|Day of the Week|Day|Month|Year|
+------------------+-------------------+------------+-----------+---------+---------------+-----+---------------+---+-----+----+
| CITY OF LAS VEGAS|3000100090110047880|     1004788|CC029951602|   Actual|              0|0.926|            Wed| 10|    8|2014|
| CITY OF LAS VEGAS|3000100090110047880|     1004788|CC029951602|   Actual|              0|0.927|            Wed| 10|    8|2014|
| CITY OF LAS VEGAS|3000100090110047880|     1004788|CC029951602|   Actual|              0|0.925|            Wed| 10|    8|2014|
| CITY OF LAS VEGAS|3000100090110047880|     1004788|CC029951602|   Actual|              0|0.926|            Wed| 10|    8|2014|
| CITY OF LAS VEGAS|3000100090110047880|     1004788|CC029951602|   Actual|              1|0.927|

Parte de Machine Learning

In [0]:
# Treinamento e teste do modelo
train_df, test_df = sparkDF_tratamento.randomSplit([.8, .2], seed=42)

In [0]:
# Treinamento e teste do modelo sample
train_sample_df, test_sample_df = sparkDF_sample_tratamento.randomSplit([.8, .2], seed=42)

In [0]:
# OneHotEncoder para separação de colunas strings em colunas categóricas 
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]
index_output_cols = [x + "Index" for x in categorical_cols]
ohe_output_cols = [x + "OHE" for x in categorical_cols]

string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)

In [0]:
# OneHotEncoder para separação de colunas strings em colunas categóricas do sample
from pyspark.ml.feature import OneHotEncoder, StringIndexer

categorical_cols_sample = [field for (field, dataType) in train_sample_df.dtypes if dataType == "string"]
index_output_cols_sample = [x + "Index" for x in categorical_cols_sample]
ohe_output_cols_sample = [x + "OHE" for x in categorical_cols_sample]

string_indexer_sample = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")
ohe_encoder_sample = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)

In [0]:
#Função utilizada para criação de vetores
from pyspark.ml.feature import VectorAssembler

numeric_cols = [field for (field, dataType) in train_df.dtypes if ((dataType == "double") & (field != "Usage"))]
assembler_inputs = ohe_output_cols + numeric_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

In [0]:
#Função utilizada para criação de vetores do sample
from pyspark.ml.feature import VectorAssembler

numeric_cols_sample = [field for (field, dataType) in train_sample_df.dtypes if ((dataType == "double") & (field != "Usage"))]
assembler_inputs_sample = ohe_output_cols_sample + numeric_cols_sample
vec_assembler_sample = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

In [0]:
# Regressão linear do dataframe
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol="Usage", featuresCol="features")

In [0]:
# Regressão linear do dataframe do sample
from pyspark.ml.regression import LinearRegression

lr_sample = LinearRegression(labelCol="Usage", featuresCol="features")

In [0]:
#Criação do pipeline do dataframe
from pyspark.ml import Pipeline

stages = [string_indexer, ohe_encoder, vec_assembler, lr]
pipeline = Pipeline(stages=stages)

pipeline_model = pipeline.fit(train_df)

In [0]:
#Criação do pipeline do sample
from pyspark.ml import Pipeline

stages_sample = [string_indexer_sample, ohe_encoder_sample, vec_assembler_sample, lr_sample]
pipeline_sample = Pipeline(stages=stages)

pipeline_model_sample = pipeline.fit(train_df)

In [0]:
#Salvando modelos
path = "dbfs:/FileStore/shared_uploads/lucasldfm@gmail.com/modelo"


pipeline_model.write().overwrite().save(path)

In [0]:
#Salvando modelos do sample
path = "dbfs:/FileStore/shared_uploads/lucasldfm@gmail.com/modelo_sample"


pipeline_model_sample.write().overwrite().save(path)

In [0]:
#Carregando modelo
from pyspark.ml import PipelineModel

saved_pipeline_model = PipelineModel.load(path)

In [0]:
#Carregando modelo do sample
from pyspark.ml import PipelineModel

saved_pipeline_model_sample = PipelineModel.load(path)

In [0]:
#Aplicação do modelo
pred_df = saved_pipeline_model.transform(test_df)

display(pred_df.select("features", "Usage", "prediction"))

features,Usage,prediction
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 16), values -> List(1.0, 1.0, 1.0, 1.0))",0.926,0.4622770919135476
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 19), values -> List(1.0, 1.0, 1.0, 1.0))",0.928,0.4618774028983755
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 20), values -> List(1.0, 1.0, 1.0, 1.0))",0.929,0.4613836705872395
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 19), values -> List(1.0, 1.0, 1.0, 1.0))",0.929,0.4618774028983755
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15), values -> List(1.0, 1.0, 1.0))",0.93,0.4617247951856355
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 16), values -> List(1.0, 1.0, 1.0, 1.0))",0.93,0.4622770919135476
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 17), values -> List(1.0, 1.0, 1.0, 1.0))",0.931,0.4630852823891078
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 18), values -> List(1.0, 1.0, 1.0, 1.0))",0.931,0.4628497724361411
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 21), values -> List(1.0, 1.0, 1.0, 1.0))",0.932,0.4614000870729768
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 16), values -> List(1.0, 1.0, 1.0, 1.0))",0.932,0.4622770919135476


In [0]:
#Aplicação do modelo do sample
pred_df_sample = saved_pipeline_model_sample.transform(test_sample_df)

display(pred_df_sample.select("features", "Usage", "prediction"))

features,Usage,prediction
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15), values -> List(1.0, 1.0, 1.0))",0.935,0.4617247951856355
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 21), values -> List(1.0, 1.0, 1.0, 1.0))",0.943,0.4614000870729768
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 18), values -> List(1.0, 1.0, 1.0, 1.0))",0.945,0.4628497724361411
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 18), values -> List(1.0, 1.0, 1.0, 1.0))",0.954,0.4628497724361411
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 21), values -> List(1.0, 1.0, 1.0, 1.0))",0.924,0.4614000870729768
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 21), values -> List(1.0, 1.0, 1.0, 1.0))",0.943,0.4614000870729768
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15), values -> List(1.0, 1.0, 1.0))",0.946,0.4617247951856355
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 21), values -> List(1.0, 1.0, 1.0, 1.0))",0.951,0.4614000870729768
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 19), values -> List(1.0, 1.0, 1.0, 1.0))",0.931,0.4618774028983755
"Map(vectorType -> sparse, length -> 22, indices -> List(0, 2, 15, 16), values -> List(1.0, 1.0, 1.0, 1.0))",0.933,0.4622770919135476
