## Proyecto BIG DATA: Google Merchandise Store

Daniela Gonzalez

Andrés Forero

Daniel Cuellar

## Librerías

In [4]:
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pymongo

from pprint import pprint

from ydata_profiling import ProfileReport

from dask_mongo import read_mongo, to_mongo

## Contexto

Las bases de datos usadas se obtuvieron de [Kaggle](https://www.kaggle.com/competitions/ga-customer-revenue-prediction/overview). Comprende un conjunto de datos de clientes de Google Merchandise Store, usados para predecir los ingresos por cliente.

## Descripción de los datos

El conjunto de datos contiene varios archivos csv para realizar los modelos de predicción, para el reto es necesario utilizar dos conjuntos de datos un para el entrenamiento y otro para la prueba. 
- train_v2.csv: El conjunto de datos contiene transacciones de usuario desde el 1 de agosto de 2016 hasta el 30 de abril de 2018.
- test_v2.csv: El conjunto de datos contiene transacciones de usuario desde el 1 de mayo de 2018 hasta el 15 de octubre de 2018.

<h3>Variables</h3>

- fullVisitorId- Identificador único para cada usuario de Google Merchandise Store.
- channelGrouping - Canal a través del cual el usuario llegó a la Tienda.
- date - Fecha en la que el usuario visitó la Tienda.
- device - Especificaciones del dispositivo utilizado para acceder a la tienda
- geoNetwork - Información sobre la geografía del usuario.
- socialEngagementType - Tipo de compromiso, ya sea "Socialmente comprometido" o "No socialmente comprometido". 
- totals - Valores agregados a lo largo de la sesión. 
- trafficSource - Información sobre la fuente de tráfico desde la que se originó la sesión.
- visitId -  Identificador para la sesión.
- visitNumber - Número de sesión de este usuario
- visitStartTime - Marca de tiempo
- hits - Proporciona un registro de todas las visitas a la página.
- customDimensions - Contiene las dimensiones personalizadas a nivel de usuario o de sesión que se establecen para una sesión
- totals - Incluye principalmente datos agregados de alto nivel

## 1 Objetivos

- Objetivo general

    - Aplicar herramientas adecuadas para la administración y procesamiento eficiente de grandes volúmenes de datos, con el fin de optimizar el almacenamiento, la manipulación y el análisis de la información, brindando así una base sólida para la toma de decisiones fundamentada en datos. 

<br>



- Objetivos especificos

    - Utilizar MongoDB para almacenar y procesar la base de datos Google Merchandise Store, para una mejor gestión en las consultas y rendimiento en las solicitudes.

    - Implementar Dask para conectar y trabajar con los datos almacenados en MongoDB, que permita aprovechar las capacidades de procesamiento de Dask en combinación con la flexibilidad de MongoDB, utilizando estrategias de particionamiento y ejecución distribuida.



## 2 Creación de base de datos en Mongo

Para manejar los datasets de entrenamiento y prueba con un peso combinado de 30,7 GB, y al poseer estructuras documentales dentro de la misma, se optó por cargarlos en una base local con Mongo.

<img src = "https://drive.google.com/uc?id=1mUg8y2gjhUSBE7TVIOqKConA4ubhYQze" alt = "Encabezado MLDS" width = "80%">  </img>   

De esta manera podemos aprovechar la interfaz para vistas rápidas, o incluso generar esquemas.

<img src = "https://drive.google.com/uc?id=1wN7_0YhbzPEa-Eqp5yUbFM0qYuw5x0Rl" alt = "Encabezado MLDS" width = "80%">  </img>  

## 3 Conexión a la base de datos de Mongo

In [3]:
client = pymongo.MongoClient("mongodb://localhost:27017")
bd= client["Proyecto_MOD_3"]
collection_Test = bd["Test"]
collection_Train = bd["Train"]


### Muestra de la base

In [13]:
pprint(collection_Test.find_one({}))

{'_id': ObjectId('648f8586aec8990bd2ff8e99'),
 'channelGrouping': 'Organic Search',
 'customDimensions': "[{'index': '4', 'value': 'APAC'}]",
 'date': 20180511,
 'device': {'browser': 'Chrome',
            'browserSize': 'not available in demo dataset',
            'browserVersion': 'not available in demo dataset',
            'deviceCategory': 'mobile',
            'flashVersion': 'not available in demo dataset',
            'isMobile': True,
            'language': 'not available in demo dataset',
            'mobileDeviceBranding': 'not available in demo dataset',
            'mobileDeviceInfo': 'not available in demo dataset',
            'mobileDeviceMarketingName': 'not available in demo dataset',
            'mobileDeviceModel': 'not available in demo dataset',
            'mobileInputSelector': 'not available in demo dataset',
            'operatingSystem': 'Android',
            'operatingSystemVersion': 'not available in demo dataset',
            'screenColors': 'not availab

In [14]:
pprint(collection_Train.find_one({}))

{'_id': ObjectId('648f86efaec8990bd205af51'),
 'channelGrouping': 'Organic Search',
 'customDimensions': "[{'index': '4', 'value': 'EMEA'}]",
 'date': 20171016,
 'device': {'browser': 'Firefox',
            'browserSize': 'not available in demo dataset',
            'browserVersion': 'not available in demo dataset',
            'deviceCategory': 'desktop',
            'flashVersion': 'not available in demo dataset',
            'isMobile': False,
            'language': 'not available in demo dataset',
            'mobileDeviceBranding': 'not available in demo dataset',
            'mobileDeviceInfo': 'not available in demo dataset',
            'mobileDeviceMarketingName': 'not available in demo dataset',
            'mobileDeviceModel': 'not available in demo dataset',
            'mobileInputSelector': 'not available in demo dataset',
            'operatingSystem': 'Windows',
            'operatingSystemVersion': 'not available in demo dataset',
            'screenColors': 'not avai

### Limpieza de datos

Cantidad de documentos con 'customDimensions' = []

In [11]:
N_doc_Test = collection_Test.count_documents({})
N_doc_Train = collection_Train.count_documents({})

print(f"""Base de datos conformada por:\n {N_doc_Test} para Test \n {N_doc_Train} para Train""")

Base de datos conformada por:
 401589 para Test 
 1708337 para Train


Dado que hay campos con estructura JSON pero tomandos como lista:

In [12]:
pprint(collection_Test.find_one({}, {"customDimensions":1,"hits":1}))

{'_id': ObjectId('648f8586aec8990bd2ff8e99'),
 'customDimensions': "[{'index': '4', 'value': 'APAC'}]",
 'hits': "[{'hitNumber': '1', 'time': '0', 'hour': '21', 'minute': '29', "
         "'isInteraction': True, 'page': {'pagePath': '/home', 'hostname': "
         "'shop.googlemerchandisestore.com', 'pageTitle': 'Home', "
         "'pagePathLevel1': '/home', 'pagePathLevel2': '', 'pagePathLevel3': "
         "'', 'pagePathLevel4': ''}, 'appInfo': {'screenName': "
         "'shop.googlemerchandisestore.com/home', 'landingScreenName': "
         "'shop.googlemerchandisestore.com/home', 'exitScreenName': "
         "'shop.googlemerchandisestore.com/home', 'screenDepth': '0'}, "
         "'exceptionInfo': {'isFatal': True}, 'eventInfo': {'eventCategory': "
         "'Enhanced Ecommerce', 'eventAction': 'Promotion Click'}, 'product': "
         "[], 'promotion': [{'promoId': 'Category Row 2', 'promoName': "
         "'Accessories', 'promoCreative': 'toy.png', 'promoPosition': "
         "'C

Se revisan valores vacíos (No poseen Json dentro de la lista)

In [13]:
def Vacios_json_list(field_name, collection):
    query = {field_name: []}
    count = collection.count_documents(query)
    return count

field_name = 'customDimensions'
print(f"Vacíos en {field_name} de Test: ",Vacios_json_list(field_name, collection_Test))
print(f"Vacíos en {field_name} de Train: ",Vacios_json_list(field_name, collection_Train))

field_name = 'hits'
print(f"Vacíos en {field_name} de Test: ",Vacios_json_list(field_name, collection_Test))
print(f"Vacíos en {field_name} de Train: ",Vacios_json_list(field_name, collection_Train))

Vacíos en customDimensions de Test:  60581
Vacíos en customDimensions de Train:  333235
Vacíos en hits de Test:  58
Vacíos en hits de Train:  1215


In [15]:
query = {"customDimensions" : {'$eq': []},
         "hits" : {'$eq': []}}

collection_Test.delete_many(query)
collection_Train.delete_many(query)

<pymongo.results.DeleteResult at 0x2cb7ebec040>

### Consultas

Muestra de los sistemas operativos usados.

In [9]:
browser= collection_Train.find({"device.browser": "Firefox"},{"device.operatingSystem":1}).limit(5)
for res in browser:
    print(res)

{'_id': ObjectId('648f86efaec8990bd205af51'), 'device': {'operatingSystem': 'Windows'}}
{'_id': ObjectId('648f86efaec8990bd205af65'), 'device': {'operatingSystem': 'Windows'}}
{'_id': ObjectId('648f86efaec8990bd205af85'), 'device': {'operatingSystem': 'Macintosh'}}
{'_id': ObjectId('648f86efaec8990bd205af8e'), 'device': {'operatingSystem': 'Macintosh'}}
{'_id': ObjectId('648f86efaec8990bd205afab'), 'device': {'operatingSystem': 'Windows'}}


Conteo de los navegadores más usados.

In [10]:

pipeline = [
    {"$group": {"_id": "$device.browser", "count": {"$sum": 1}}},
    {"$sort": {"count": -1}},
    {"$limit": 10}
]

results = collection_Train.aggregate(pipeline)

for result in results:
    print(result['_id'], "-", result['count'])

Chrome - 1173018
Safari - 312165
Firefox - 63845
Internet Explorer - 35474
Android Webview - 34266
Edge - 20543
Samsung Internet - 15792
Opera Mini - 15018
Safari (in-app) - 14207
Opera - 9585


Conteo de visitas por continente.

In [11]:
pipeline = [
    {
        "$group": {
            "_id": "$geoNetwork.continent",
            "visitNumber": { "$sum": "$visitNumber" }
        }
    },
    {
        "$sort": { "_id": 1 }
    }
]

results = collection_Train.aggregate(pipeline)
for result in results:
    print(result["_id"], result["visitNumber"])

(not set) 5670
Africa 42944
Americas 2532496
Asia 713807
Europe 654860
Oceania 39426


Cantidad de visitas por países del continente Americano.

In [12]:
pipeline = [
    {
        "$match": {
            "geoNetwork.continent": "Americas"
        }
    },
    {
        "$group": {
            "_id": "$geoNetwork.country",
            "totalVisits": { "$sum": 1 }
        }
    },
    {
        "$sort": {
            "totalVisits": -1
        }
    },
    {"$limit": 10}
]

results = collection_Train.aggregate(pipeline)
for result in results:
    print(result["_id"], result["totalVisits"])

United States 717217
Canada 51057
Brazil 35432
Mexico 25270
Argentina 10128
Colombia 9434
Peru 9234
Chile 3721
Venezuela 2909
Ecuador 2007


Número de visitas por fecha (limitado a 10 para visualización).

In [13]:
pipeline = [
    {
        "$group": {
            "_id": "$date",
            "visitNumber": { "$sum": "$visitNumber" }
        }
    },
    {
        "$sort": { "_id": 1 }
    },
    {"$limit": 10}
]

results = collection_Train.aggregate(pipeline)
for result in results:
    print(result["_id"], result["visitNumber"])

20160801 6320
20160802 6929
20160803 7233
20160804 6456
20160805 5686
20160806 3080
20160807 2819
20160808 6758
20160809 6256
20160810 6920


Cierre del conector de base de datos

In [14]:
client.close()

## 4 Uso de la base de datos con Dask

Definición del tipo de columnas para leer con Dask

In [2]:
column_types = {'_id':str, 'channelGrouping':str, 'customDimensions':str, 'date':str, 'device':str,
       'fullVisitorId':str, 'geoNetwork':str, 'hits':str, 'socialEngagementType':str, 'totals':str,
       'trafficSource':str, 'visitId':str, 'visitNumber':str, 'visitStartTime':str}

Conexión a la base de Mongo

In [3]:
base_Test = read_mongo(
    connection_kwargs={"host": "mongodb://localhost:27017"},
    database="Proyecto_MOD_3",
    collection="Test",
    chunksize=500,
).to_dataframe(meta=column_types)

base_Train = read_mongo(
    connection_kwargs={"host": "mongodb://localhost:27017"},
    database="Proyecto_MOD_3",
    collection="Train",
    chunksize=500,
).to_dataframe(meta=column_types)


Particiones utilizadas

In [35]:
base_Test.npartitions

804

Dimensiones del dataset resultante

In [36]:
rows_Test = base_Test.shape[0].compute()

In [37]:
rows_Train = base_Train.shape[0].compute()

In [38]:
print(f"""
      Número de filas en Test: {rows_Test}
      Número de filas en Train: {rows_Train}""")


      Número de filas en Test: 401582
      Número de filas en Train: 1708299


Verificando que no existan vacíos en las columnas que parecen tener un Json (customDimensions, hits)

In [11]:
bte_cd_empty = sum(base_Test["customDimensions"]=="[]")
bte_h_empty = sum(base_Test["hits"]=="[]")

btr_cd_empty = sum(base_Train["customDimensions"]=="[]")
btr_h_empty = sum(base_Train["hits"]=="[]")


In [12]:
print(f""" 
      Vacíos ajustados en base_Test:
        customDimensions: {bte_cd_empty}
        hits: {bte_h_empty}
      
      Vacíos ajustados en base_Train:
        customDimensions: {btr_cd_empty}
        hits: {btr_h_empty}
      """)

 
      Vacíos ajustados en base_Test:
        customDimensions: 0
        hits: 0
      
      Vacíos ajustados en base_Train:
        customDimensions: 0
        hits: 0
      


Estructura del dataset

In [138]:
base_Test.head(5)

Unnamed: 0,_id,channelGrouping,customDimensions,date,device,fullVisitorId,geoNetwork,hits,socialEngagementType,totals,trafficSource,visitId,visitNumber,visitStartTime
0,648f8586aec8990bd2ff8e99,Organic Search,"[{'index': '4', 'value': 'APAC'}]",20180511,"{'browser': 'Chrome', 'browserVersion': 'not a...",7460955084541987166,"{'continent': 'Asia', 'subContinent': 'Souther...","[{'hitNumber': '1', 'time': '0', 'hour': '21',...",Not Socially Engaged,"{'visits': '1', 'hits': '4', 'pageviews': '3',...","{'referralPath': '(not set)', 'campaign': '(no...",1526099341,2,1526099341
1,648f8586aec8990bd2ff8e9a,Direct,"[{'index': '4', 'value': 'North America'}]",20180511,"{'browser': 'Chrome', 'browserVersion': 'not a...",460252456180441002,"{'continent': 'Americas', 'subContinent': 'Nor...","[{'hitNumber': '1', 'time': '0', 'hour': '11',...",Not Socially Engaged,"{'visits': '1', 'hits': '4', 'pageviews': '3',...","{'referralPath': '(not set)', 'campaign': '(no...",1526064483,166,1526064483
2,648f8586aec8990bd2ff8e9b,Organic Search,"[{'index': '4', 'value': 'North America'}]",20180511,"{'browser': 'Chrome', 'browserVersion': 'not a...",3461808543879602873,"{'continent': 'Americas', 'subContinent': 'Nor...","[{'hitNumber': '1', 'time': '0', 'hour': '12',...",Not Socially Engaged,"{'visits': '1', 'hits': '4', 'pageviews': '3',...","{'referralPath': '(not set)', 'campaign': '(no...",1526067157,2,1526067157
3,648f8586aec8990bd2ff8e9c,Direct,"[{'index': '4', 'value': 'North America'}]",20180511,"{'browser': 'Chrome', 'browserVersion': 'not a...",975129477712150630,"{'continent': 'Americas', 'subContinent': 'Nor...","[{'hitNumber': '1', 'time': '0', 'hour': '23',...",Not Socially Engaged,"{'visits': '1', 'hits': '5', 'pageviews': '4',...","{'referralPath': '(not set)', 'campaign': '(no...",1526107551,4,1526107551
4,648f8586aec8990bd2ff8e9d,Organic Search,"[{'index': '4', 'value': 'North America'}]",20180511,"{'browser': 'Internet Explorer', 'browserVersi...",8381672768065729990,"{'continent': 'Americas', 'subContinent': 'Nor...","[{'hitNumber': '1', 'time': '0', 'hour': '10',...",Not Socially Engaged,"{'visits': '1', 'hits': '5', 'pageviews': '4',...","{'referralPath': '(not set)', 'campaign': '(no...",1526060254,1,1526060254


Validación de operaciones con la base cargada

In [131]:
base_Test.channelGrouping.value_counts().compute()

channelGrouping
Organic Search    198378
Direct             76076
Referral           59498
Social             36881
Paid Search        12834
Affiliates         10833
Display             7076
(Other)                6
Name: count, dtype: int64

Perfilamiento de los datos en formato html

In [None]:
muestra = base_Test.sample(frac=0.2)
profile = ProfileReport(muestra.compute())
profile
profile.to_file("reporte_base_Test.html")

## 5 Conclusiones

* La elección de MongoDB se basó en las capacidades de administración de grandes volúmenes de datos, al igual que, a la naturaleza de estos, ya que varias columnas estaban representadas en formato json, dando una vista documental.

* Una vez establecida la bases de datos, se buscó una forma eficiente de poder trabajar con los mismos, para ello, con Dask se utilizó computación paralela y distribuida para implementar diversas operaciones y al final computar los resultados. 
