# Profile dos Dados

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

In [3]:
import pandas as pd
import numpy as np
from pyspark.sql.types import IntegerType
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col

In [4]:
sensor_df = spark.read.option("delimiter", ",").csv('sensor_data.csv', inferSchema = True, header = True)
sensor_df.show()

+------------+-------------------+--------+-------+---------+------------+---------------+--------------------+-----------+--------------+-------------------+
|ambient_temp|           datetime|humidity|    lat|      lng|photo_sensor|radiation_level|           sensor_id|sensor_name|   sensor_uuid|          timestamp|
+------------+-------------------+--------+-------+---------+------------+---------------+--------------------+-----------+--------------+-------------------+
|       16.70|2017-08-30 18:42:45| 76.4517|  36.17|-119.7462|      1003.3|            201|c6698873b4f14b995...| California|probe-2a2515fc|2017-08-30 13:12:45|
|       17.33|2017-08-30 19:02:10| 85.4126|  36.17|-119.7462|      1004.9|            195|c6698873b4f14b995...| California|probe-62e06003|2017-08-30 13:32:10|
|       18.98|2017-08-30 15:31:23| 78.6092|39.0646|-105.3272|      1020.2|            198|2d5831578ea970d0c...|   Colorado|probe-d1bf2004|2017-08-30 10:01:23|
|       17.04|2017-08-30 17:59:14| 83.6593|  3

In [6]:
sensor_df.printSchema()

root
 |-- ambient_temp: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- humidity: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- photo_sensor: double (nullable = true)
 |-- radiation_level: integer (nullable = true)
 |-- sensor_id: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- sensor_uuid: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [7]:
sensor_df = sensor_df.withColumn("ambient_temp", sensor_df["ambient_temp"].cast(IntegerType()))
sensor_df = sensor_df.withColumn("humidity", sensor_df["humidity"].cast(IntegerType()))
sensor_df.printSchema()

root
 |-- ambient_temp: integer (nullable = true)
 |-- datetime: string (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- photo_sensor: double (nullable = true)
 |-- radiation_level: integer (nullable = true)
 |-- sensor_id: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- sensor_uuid: string (nullable = true)
 |-- timestamp: string (nullable = true)

None


In [9]:
sensor_df = sensor_df.withColumn("datetime", sensor_df["datetime"].cast(TimestampType()))
sensor_df = sensor_df.withColumn("timestamp", sensor_df["timestamp"].cast(TimestampType()))
sensor_df.printSchema()

root
 |-- ambient_temp: integer (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- photo_sensor: double (nullable = true)
 |-- radiation_level: integer (nullable = true)
 |-- sensor_id: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- sensor_uuid: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [None]:
sensor_df.show()

In [10]:
def dataprofile(data_all_df,data_cols):
    data_df = data_all_df.select(data_cols)
    columns2Bprofiled = data_df.columns
    dprof_df = pd.DataFrame({'coluna':data_df.columns,\
                             'tipo_dado':[x[1] for x in data_df.dtypes]}) 
    dprof_df = dprof_df[['coluna', 'tipo_dado']]
    # ======================
    num_rows = data_df.count()
    dprof_df['linhas'] = num_rows
    # ======================    
    # Quantidade de Nulos
    df_nacounts = data_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_df.columns \
                                  if data_df.select(c).dtypes[0][1]!='timestamp']).toPandas().transpose()
    df_nacounts = df_nacounts.reset_index()  
    df_nacounts.columns = ['coluna','qtd_nulos']
    dprof_df = pd.merge(dprof_df, df_nacounts, on=['coluna'], how='left')
    # ========================
    # Quantidade de Vazios
    num_spaces = [data_df.where(F.col(c).rlike('^\\s+$')).count() for c in data_df.columns]
    dprof_df['qtd_espacos'] = num_spaces
    num_blank = [data_df.where(F.col(c)=='').count() for c in data_df.columns]
    dprof_df['qtd_brancos'] = num_blank
    # =========================
    # Achar os valores pelo describe 
    desc_df = data_df.describe().toPandas().transpose()
    desc_df.columns = ['total', 'media', 'desvio_padrao', 'min', 'max']
    desc_df = desc_df.iloc[1:,:]  
    desc_df = desc_df.reset_index()  
    desc_df.columns.values[0] = 'coluna'  
    desc_df = desc_df[['coluna','total', 'media', 'desvio_padrao']] 
    dprof_df = pd.merge(dprof_df, desc_df, on = ['coluna'], how = 'left')
    # ===========================================
    # Maximos e minimos valores
    allminvalues = [data_df.select(F.min(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    allmaxvalues = [data_df.select(F.max(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    df_counts = dprof_df[['coluna']]
    df_counts.insert(loc=0, column='min', value=allminvalues)
    df_counts.insert(loc=0, column='max', value=allmaxvalues)
    df_counts = df_counts[['coluna','min','max']]
    dprof_df = pd.merge(dprof_df, df_counts , on = ['coluna'], how = 'left')  
    # ==========================================
    # Total de Valores distintos de cada coluna
    dprof_df['num_val_distintos'] = [data_df.select(x).distinct().count() for x in columns2Bprofiled]
    # ============================================
    # Maior frequencia
    dprof_df['maior_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=False).limit(1).\
                                       toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['maior_val_freq'] = [x[0] for x in dprof_df['maior_freq_valwcount']]
    dprof_df['maior_val_freq_total'] = [x[1] for x in dprof_df['maior_freq_valwcount']]
    dprof_df = dprof_df.drop(['maior_freq_valwcount'],axis=1)
    # Menor frequencia
    dprof_df['menor_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=True).limit(1).\
                                        toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['menor_val_freq'] = [x[0] for x in dprof_df['menor_freq_valwcount']]
    dprof_df['menor_val_freq_total'] = [x[1] for x in dprof_df['menor_freq_valwcount']]
    dprof_df = dprof_df.drop(['menor_freq_valwcount'],axis=1)

    return dprof_df

In [11]:
colunas = sensor_df.columns
colunas

['ambient_temp',
 'datetime',
 'humidity',
 'lat',
 'lng',
 'photo_sensor',
 'radiation_level',
 'sensor_id',
 'sensor_name',
 'sensor_uuid',
 'timestamp']

In [12]:
dprofile = dataprofile(sensor_df, colunas)
dprofile

Unnamed: 0,coluna,tipo_dado,linhas,qtd_nulos,qtd_espacos,qtd_brancos,total,media,desvio_padrao,min,max,num_val_distintos,maior_val_freq,maior_val_freq_total,menor_val_freq,menor_val_freq_total
0,ambient_temp,int,597991,3370.0,0,0,594621.0,20.48431858276112,5.435751405402511,0,59,54,21,49157.0,59,3.0
1,datetime,timestamp,597991,,0,0,,,,2017-08-30 15:29:31,2017-08-30 19:15:37,13560,2017-08-30 17:40:13,97.0,2017-08-30 15:29:31,1.0
2,humidity,int,597991,345.0,0,0,597646.0,79.50915759496424,4.0166357085189865,62,96,36,80,59162.0,62,5.0
3,lat,double,597991,0.0,0,0,597991.0,33.82808582191484,3.959843393123388,27.8333,39.0646,5,34.9513,122087.0,31.106,115999.0
4,lng,double,597991,0.0,0,0,597991.0,-99.17472767612324,12.722724414328988,-119.746,-81.717,5,-92.3809,122087.0,-97.6475,115999.0
5,photo_sensor,double,597991,0.0,0,0,597991.0,798.7008339590398,60.16172463387437,7,1022.9,23372,8,760.0,876.72,1.0
6,radiation_level,int,597991,0.0,0,0,597991.0,199.5054607845269,2.018486742651516,190,208,19,200,114714.0,190,5.0
7,sensor_id,string,597991,14.0,0,0,597977.0,,,24408ce3f09b31f9d3454ee6ea81bb63,e22db77d29211b5bdc0eadc5098a28f7,6,e22db77d29211b5bdc0eadc5098a28f7,122083.0,,14.0
8,sensor_name,string,597991,0.0,0,0,597991.0,,,Arkansas,Texas,5,Arkansas,122087.0,Texas,115999.0
9,sensor_uuid,string,597991,0.0,0,0,597991.0,,,probe-0002ae11,probe-ffff62ad,122866,probe-bb4205e3,11.0,probe-d3d1d568,1.0


In [13]:
spark.sparkContext.stop()