# Exploración de datos para la colección de análisis de imágenes

En el presente notebook se exploran los datos de la coleccion <i>phis_imagenanalysesangle_aggregated_by_day</i>, esta coleccion se encuentra en MongoDB y es el resultado de un preprocesamiento realizado con Pipelines de MongoDB. Este proceso se hizo como tranformación prevía para mejorar el rendimiento y reducir el dataset. La colección contiene documentos(registros),  cada documento de la colección contiene todos los datos que se recogieron de una variables en un solo dia, es decir, que los datos por horas fueron resumidos, en el registro se identifica el URI de la planta dentro del experimento y el seguimiento de cada variable. Para propósito de este caso de estudio los datos se agregan con la función promedio. 


## Configuración inicial 

En el siguiente bloque se cargan las librerías para hacer uso de pyspark, pyspark permite manipular el contexto de spark dentro de un programa de python, por ser este un contexto interactivo(Jupyter) se agrega la línea <i>findspark.init()</i>. Entre las configuraciones se destacan: la inclusión del conector mongodb para cargar colecciones, y la línea <i>.master('local[*]')</i> para habilitar en el contexto de pyspark la disponibilidad de todos los procesadores de la máquina

*Nota: Previo a esta ejecución se debió descargar el driver del conector de mongo-spark la versión exacta se encuentra en el código

In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

MONGO_URI="mongodb://localhost:27017/iot_db" 
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .master('local[*]')\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.3.2")\
    .config("spark.mongodb.input.uri", MONGO_URI+".phis_experiments") \
    .getOrCreate()

#.config("spark.mongodb.input.uri", "mongodb://localhost:27017/iot_db.performanceDataset") \
#.config("spark.mongodb.output.uri", "mongodb://localhost:27017/iot_db.result") \

def getTableData(collection) : 
    return my_spark.read.format("com.mongodb.spark.sql.DefaultSource")\
                                            .option("database","iot_db")\
                                            .option("collection", collection)\
                                            .load()

#my_spark.sparkContext.getConf().getAll()
    

## Carga de experimentos

En esta sesión se cargan los experimentos con el fin de tener en memoria una lista con los experimentos de la base de datos, un experimento es la entidad agrupadora de un grupo de plantas, o de el seguimiento en campo de varias áreas plantadas, todo el notebook está pensado para mirar solamente sobre un experimento



In [2]:
df_experiments = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df_experiments=df_experiments.select('experimentURI')
df_experiments.show()


AnalysisException: "cannot resolve '`experimentURI`' given input columns: [];;\n'Project ['experimentURI]\n+- Relation[] MongoRelation(MongoRDD[0] at RDD at MongoRDD.scala:51,Some(StructType()))\n"

## Carga de la colección imagenanalysesangle_aggregated_by_day

En este punto se hace  un llamado a una función ya definida que permite cargar una colección de mongodb a un dataframe, después se proyectan los atributos de interés, finalmente se hace uso de la función <b>pivot</b> de pyspark que permite convertir cada valor distinto dentro de una columna en otra columna, cambiando así la forma del dataset.

Dado que <i>variableCode</i> contiene una lista arbitraria de nombres de variables esta línea de código aporta flexibilidad y hace una reestructuración automática del dataset, la forma de la coleccion de entrada está pensada para facilitar la ejecución de este método.



In [None]:
df_images_anaysis = getTableData( "phis_imagenanalysesangle_aggregated_by_day")

print("Se pueden observar algunas estadísticas")
print("# de registros",df_images_anaysis.count())
print("# de Plantas",df_images_anaysis.select("plantURI").distinct().count())
print("Variables iniciales")
df_images_anaysis.select("variableCode").distinct().show()

your_min_value = df_images_anaysis.agg({"dayOfYear": "min"}).collect()[0][0]
print("Initial Day:",your_min_value)
your_max_value = df_images_anaysis.agg({"dayOfYear": "max"}).collect()[0][0]
print("End Day:", your_max_value)

print("Se pueden observar los valores de la columna variableCode")
df_images_anaysis.select("plantURI","dayOfYear", "variableCode", "variableCodeValueAvg").show(20)



In [None]:
df_images_anaysis = df_images_anaysis.select("plantURI", "dayOfYear", "variableCode", "variableCodeValueAvg")
df_grp_img_analysis = df_images_anaysis.groupBy("plantURI", "dayOfYear").pivot("variableCode").mean("variableCodeValueAvg")

print("Se pueden observar dos de las nuevas columnas")
df_grp_img_analysis.select("plantURI", "dayOfYear","height_over_pot","convex_hull_area").show()


* EL tiempo de ejecución de la función pivot fue de 56s y se ejecutaron 200 tareas en spark

In [None]:
#df_grp_img_analysis.describe().show(truncate=False, vertical=True)
import numpy as np 
import pandas as pd

pd.DataFrame()
def describe_pd(df_in, columns, deciles=False):
    '''
    Function to union the basic stats results and deciles
    :param df_in: the input dataframe
    :param columns: the cloumn name list of the numerical variable
    :param deciles: the deciles output

    :return : the numerical describe info. of the input dataframe

    :author: Ming Chen and Wenqiang Feng
    :email:  von198@gmail.com
    '''

    if deciles:
        percentiles = np.array(range(0, 110, 10))
    else:
        percentiles = [0.25, 0.5,0.75]

    error=0.1
    percs = np.transpose([df_in.approxQuantile(x,percentiles, error) for x in columns])
    percs = pd.DataFrame(percs, columns=columns)
    percs['summary'] = [str(p) + '%' for p in percentiles]

    spark_describe = df_in.describe().toPandas()
    new_df = pd.concat([spark_describe, percs],ignore_index=True, sort=True)
    new_df = new_df.round(2)
    return new_df[['summary'] + columns]

## Descripción Estadística de las Columnas Numéricas

El dataset formado de filas y columnas se encuentra en el contexto de pyspark, en el código de arriba se implementa una función para imprimir datos estadísticos de cada variable. La función describe() de spark retorna algunos datos estadísticos importantes sin embargo no muestra los cuartiles, es por ello que se debe usar approxQuantile una función propia de spark que recibe la columna, el percentil de interés y una fracción de error permitido, puesto que para cantidades de datos elevadas el cálculo es aproximado. Es de notar que la función approxQuantile es una función nativa de pyspark, a pesar de ser llamada dentro de python el grueso de su ejecución tendrá lugar dentro de la JVM(Java Virtual Machine). La recomendación es utilizar con prioridad las funciones propias de spark y no combinarlo tanto con numpy o pandas, a menos de que ser necesario, aunque las librerías de python tengan mejor documentación y mayor cantidad de funciones. Esta ejecución es exhaustiva.


In [None]:

#df_grp_img_analysis_sample= df_grp_img_analysis.sample()
num_cols= ['dayOfYear','Silk_area','convex_hull_area','convex_hull_perimeter','height','height_over_pot','height_under_pot','number_of_objects','object_sum_area','width']


df_stats=describe_pd(df_grp_img_analysis, num_cols)

df_stats
#df_grp_img_analysis_sample.approxQuantile("Silk_area",[0.25, 0.5,0.75], 0.25)


* Este código ejecuta varios llamados de la función approxQuantile uno por variable para calcular: la mediana, q1 y q3, un total de 12 Jobs fueron ejecutados, 417 tareas por job, el tiempo de ejecución para cada job fue de 1min aprox.

## Box plot y big dataset 

En el siguiente código se va a crear un pequeño dataset para visualizar un Box Plot representativo de los datos del big dataset que se encuentra en spark, dado que no es fácil instanciar una herramienta de visualización con el big dataset lo que se hizo  fue calcular los estadísticos necesarios y crear un dataset con la forma: 

<code> [min,25%, 25% ,median, 75%, 75% ,max]</code>

En base a este formato se crearon los boxplot para cada variable

In [None]:
## reshaping statistics to draw box plot

toDelete=["count", "mean", "stddev"]
tReplicate=["0.25%", "0.75%"]

## remove count mean and std
df_stats_temp= df_stats[~df_stats["summary"].isin(toDelete) ]

## get quartile .75 and .25
df_stats_dup= df_stats[df_stats["summary"].isin(tReplicate) ]

##replicate quartiles
df_stats_temp=df_stats_temp.append(df_stats_dup, ignore_index = True) 

df_stats_temp=df_stats_temp.drop("summary",axis=1)

# df_stats_temp.dtypes
df_stats_temp=df_stats_temp.astype('float')

# df_stats_temp.dtypes

# df_stats_temp=df_stats_temp.sort_values('dayOfYear')

print(df_stats_temp)
print(df_stats_temp.dtypes)
print(df_stats_dup)

## Visualizaciones 

Para la visualización de datos se va a usar la librería plotly, esta es una librería javascript con funcionalidades para python que se puede utilizar dentro de los notebook, ofrece gráficos interactivos y crea diferentes tipo de render como imágenes:png, svg, documentos HTMLs interactivos, entre otros. Esta misma librería se puede usar para crear una aplicación web 

En el siguiente bloque se cargan las librerías de visualización 

In [None]:
## import plotly

import plotly.plotly as pl
import cufflinks as cf
import pandas as pd
import numpy as np
print(cf.__version__)
%matplotlib inline

from plotly.offline import download_plotlyjs, init_notebook_mode,plot,iplot

init_notebook_mode(connected=True)
cf.go_offline()

## Box Plot Graphic

Una vez cargada la librería se presenta esta herramienta visual estadísticas, se usó la funcionalidad de subplots por que cada atributo tiene unidades distintas y aveces se requiere saber el max y el min de una variable, y reconocer donde se encuentra la mayor densidad.



In [None]:
#x=[min, q1, q1, median, q3, q3, max]

#normalized_df=(df_stats_temp-df_stats_temp.mean())/df_stats_temp.std()
df_stats_temp.iplot(kind="box",subplots=True, shape=(2,5), shared_xaxes=True, fill=True,)


## Muestreo de Datos 

En el siguiente bloque se retorna un muestreo aleatorio del dataset grande  para visualizar los datos, dado que spark no cuenta con una libreria gratuita de visualización esta es una forma util y eficiente de visualizar los datos y justifica el uso de python. La función recibe tres parametros un boolean que indica que no actualice el dataframe original, una porcion deseada, una semilla de aleatoriedad.


In [None]:
#https://gist.github.com/cameres/bc24ac6711c9e537dd20be47b2a83558

from pyspark.mllib.stat import Statistics
import pandas as pd


df_view_vars_temp= df_grp_img_analysis.sample(False, 0.1, 12345).toPandas()



* Este código tardó aprox. 1 min y se ejecutaron 414 tareas.

## Visualización por planta

La siguiente función toma 3 plantas de la lista de plantas y para cada una hace una gráfica con todas las variables de estudio, estas variables fueron normalizadas para que compartieran los mismo ejes, ver tendencias conjuntas y correlaciones

In [None]:
print(df_view_vars_temp.columns)
df_view_vars= df_view_vars_temp#.set_index(["plantURI","dayOfYear"])

#df_view_vars=df_view_vars.sort_index()



#listof_plants= df_view_vars.index.get_level_values('plantURI').unique().tolist()
listof_plants= df_view_vars['plantURI'].unique().tolist()

print("Number of plants",len(listof_plants))

listof_plants_first_3=listof_plants[:3]


# #df_view_vars.iplot(kind='scatter', filename='cufflinks/cf-simple-line')

# #from plotly.subplots import make_subplots


for plant in listof_plants_first_3:
    dataDf=df_view_vars[df_view_vars['plantURI']==plant]
    dataDf.index= dataDf["dayOfYear"]
    dataDf=dataDf.sort_index()
    dataDf=dataDf.drop(['plantURI', 'dayOfYear','Silk_area'], axis=1)
    
    dataDf=(dataDf-dataDf.min())/dataDf.max()  # normalization
    dataDf=dataDf.round(2) 
    
    print('Plant URI:',plant)
    dataDf.iplot(kind='scatter', filename='cufflinks/cf-simple-line')
    #print(dataDf.head(20))
    
    


## Visualización: Plantas vs Variable

En las siguientes se toman algunas variables y se elabora un plot con cada una de ellas, el plot contiene el comportamienta de esa variable para cada planta, por razones de rendimiento se hacen visible las primeras diez plantas, es posible buscar la planta de interes y dar click sobre el legend para mostrarla o desvanecerla


In [None]:
## a graphic by variable for all the plants
import plotly.graph_objs as go

df_view_one_var=df_view_vars_temp

columns= ["convex_hull_area", "width", "height","number_of_objects"]

#columns= [ "width"]



layout = go.Layout(
    title=go.layout.Title(
        text='Plot Title',
        xref='paper',
        x=0
    ),
    xaxis=go.layout.XAxis(
        title=go.layout.xaxis.Title(
            text='Time',
            font=dict(
                family='Courier New, monospace',
                size=18,
                color='#7f7f7f'
            )
        )
    ),
    yaxis=go.layout.YAxis(
        title=go.layout.yaxis.Title(
            text='Amount',
            font=dict(
                family='Courier New, monospace',
                size=18,
                color='#7f7f7f'
            )
        )
    )
)

for col in columns:
    plt_settings=[]
    for index, plant in enumerate(listof_plants):
        dataDf=df_view_one_var[df_view_one_var["plantURI"]==plant]
        dataDf.index= dataDf["dayOfYear"]
        dataDf=dataDf.sort_index()
        
        visibility = "legendonly" if index > 10 else True
        config= {'x':dataDf.index, 'y':dataDf[col], 'name':plant , 'visible':visibility}
        plt_settings.append(config)
    # plot the graphics
    layout.title.text="All Plants Behavior in Time for "+col
    fig = go.Figure(data=plt_settings, layout=layout)
    iplot( fig, filename='cufflinks/simple-line')
    



## Análisis de la gráfica

A partir de este gráfico se pueden sacar algunas conclusiones, en general no todas las plantas tiene seguimiento para todos los días del experimentos es por ello que se observan lineas que no llegan hasta el último día.

* <b>Convext Hull Area:</b>  esta variable es el cálculo del área del poligono más pequeno que contiene todos los puntos de la imagen, esta variable muestra una pendiente positiva lo que se relaciona con el crecimiento de la planta, se observa como la variabilidad se empieza a notar desde el día 124, es en este punto que cada planta demuestra su potencial en cuanto a crecimiento.

* <b>Width:</b> Esta variable representa el máximo ancho de la planta la distancía entre la hoja más lejana a la izquierda y a la derecha, se observa una tendencia positiva pero en ocasiones tambien se evidencia que no hay crecimiento sino que se mantiene un ancho en el tiempo, aveces se observa un decenso de la linea pero este no es tan brusco, la planta puede crecer y las hojas cambiar de posición

* <b>Height:</b> Esta variable representa el punto más alto de la planta medido desde el punto más alto de la matera, a diferencia del width este valor muestra una tendencia positiva constante no es normal que la planta pierda alto durante el crecimiento almenos para este cultivar

* <b>Number of objects:</b> Esta variable representa el número de hojas, su exactitud es afectada fuertemente por la calidad del algoritmo para el conteo, esta es una tarea de procesamiento de imágenes que no es fácil se observan picos altos y descensos en toda la gráfica

## Información Faltante

Algunas plantas no llevaron seguimiento los 73 días del experimento la siguiente gráfica sirve para entender el grueso de las plantas cuantos días tuvieron seguimiento

In [None]:
# Count nulls
from pyspark.sql.functions import count

df_nulls=df_grp_img_analysis.groupBy('plantURI').agg(count("*").alias("RecordsNumber")).toPandas()

* El tiempo de ejecución de este Job fue de 3min

In [None]:
df_nulls["RecordsNumber"].iplot(kind='histogram', filename='cufflinks/basic-histogram')

Este histograma indica el número de registros por planta. Se evidencía con este histograma que el experimento se llevo a cabo en dos etapas, y eso podría explicar la duración distinta de cada siguimiento.