In [1]:
from api import *
from functools import partial
from sqlalchemy import create_engine
import requests
import pandas as pd
import gmaps
import gmaps.datasets
import mysql.connector
# jupyter nbextension enable --py gmaps
# jupyter nbextension enable --py widgetsnbextension

import pyspark
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from ipywidgets import widgets, interact
from IPython.display import display
import json
from pyspark.sql.functions import mean, min, max

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MinhaAplicacao").getOrCreate()
sc = spark.sparkContext
sqlContext = pyspark.SQLContext(sc)

df= spark.read.json('yelp_academic_dataset_review.json')
# apikey do google maps não está no repositório
with open('apikey.txt') as f:
    api_key = f.readline()
    f.close
gmaps.configure(api_key=api_key)
# run uvicorn api:app in terminal

In [2]:
# função que raealiza request na api e retorna os heatmaps
def mapping(user_id):
    r = requests.get("http://127.0.0.1:8000/user/"+user_id+"")
    ret = r.json()
    lat = []
    long = []
    med = []
    dp = []
    for places in ret:
        med.append(places[3])
        dp.append(places[4])
        lat.append(places[6])
        long.append(places[7])
    locations = [lat,long]
    map_df = pd.DataFrame({'latitude':lat, 'longitude':long, 'Média':med, 'Desvio Padrão':dp})
    locations = map_df[['latitude', 'longitude']]
    fig_med = gmaps.figure()
    fig_med.add_layer(gmaps.heatmap_layer(locations, weights=map_df['Média']))
    fig_dp = gmaps.figure()
    fig_dp.add_layer(gmaps.heatmap_layer(locations, weights=map_df['Desvio Padrão']))
    return fig_med, fig_dp

In [3]:
# Funções essenciais para conseguir processar os dados em um pandas dataframe
# créditos : https://gist.github.com/joshlk/871d58e01417478176e7
def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def topandas(df, n_partitions=None):
    """
    Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
    repartitioned if `n_partitions` is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

### Criando database yelp

In [5]:
engine = create_engine('mysql+pymysql://root:MegaDados@localhost/')

In [6]:
conn = engine.connect()

In [7]:
conn.execute("drop database if exists yelp")

<sqlalchemy.engine.result.ResultProxy at 0x1206ee780>

In [8]:
conn.execute("create database yelp")

<sqlalchemy.engine.result.ResultProxy at 0x111cda860>

In [9]:
conn.execute("commit")

<sqlalchemy.engine.result.ResultProxy at 0x1206eeb70>

In [10]:
engine = create_engine('mysql+pymysql://root:MegaDados@localhost/yelp')

### Selecionando usuários e inserindo a tabela idUsuarios

In [11]:
df_users = df.select("user_id").distinct()

In [12]:
df_users_pd = df_users.toPandas()

In [13]:
df_users_pd['user_id'].to_sql(con=engine, name='idUsuarios', if_exists='replace', index=False)

In [14]:
conn = engine.connect()
conn.execute("ALTER TABLE idUsuarios MODIFY COLUMN user_id VARCHAR(23)")

<sqlalchemy.engine.result.ResultProxy at 0x12058ce80>

In [15]:
conn.execute("ALTER TABLE idUsuarios ADD PRIMARY KEY (user_id)")

<sqlalchemy.engine.result.ResultProxy at 0x12073a0f0>

### Criando a tabela de relação usuários e business

In [16]:
df_usr_bus = df.select('user_id','business_id').distinct()

In [17]:
df_usr_bus_pd = topandas(df_usr_bus)

In [18]:
df_usr_bus_pd.to_sql(con=engine, name='Usuarios_Business', if_exists='replace', index=False)

In [19]:
conn.execute("ALTER TABLE Usuarios_Business MODIFY COLUMN user_id VARCHAR(23)")

<sqlalchemy.engine.result.ResultProxy at 0x12c8634e0>

In [20]:
conn.execute("ALTER TABLE Usuarios_Business MODIFY COLUMN business_id VARCHAR(23)")

<sqlalchemy.engine.result.ResultProxy at 0x12c8635f8>

In [21]:
conn.execute("ALTER TABLE Usuarios_Business ADD FOREIGN KEY (user_id) REFERENCES idUsuarios(user_id)")

<sqlalchemy.engine.result.ResultProxy at 0x12073ada0>

### Criando a tabela de business

Depois de selecionar apenas os business em um dataframe spark, foi realizado os requests na API Yelp fusion para conseguir a localização dos business. Com as localizações foi contruído o dataframe pandas para criar a tabela de business no banco de dados sql.

In [22]:
business_df = pd.read_csv("df_business.csv")
business_df = business_df.dropna()
business_df = business_df.drop_duplicates(subset=['business_id'], keep='first')

In [23]:
business_df.to_sql(con=engine, name='idBusiness', if_exists='replace', index=False)

In [24]:
conn.execute("ALTER TABLE idBusiness MODIFY COLUMN business_id VARCHAR(23)")

<sqlalchemy.engine.result.ResultProxy at 0x12052f400>

In [25]:
conn.execute("ALTER TABLE idBusiness ADD PRIMARY KEY (business_id)")

<sqlalchemy.engine.result.ResultProxy at 0x142c85e10>

In [26]:
conn.close()

### Consultando o banco de dados relacional

In [27]:
conn = connect_db()
db = partial(run_db_query, conn)

In [28]:
db('SELECT * FROM Usuarios_Business RIGHT JOIN idBusiness on idBusiness.business_id = Usuarios_Business.business_id  WHERE Usuarios_Business.user_id = "Ved7660-FWXXEob6v2qCWw"')

Executando query:
('Ved7660-FWXXEob6v2qCWw', 'bQ_wtZvMb__OhprY5bF9aQ', 'bQ_wtZvMb__OhprY5bF9aQ', 3.97518610421836, 1.22347667600239, 'Seafood', 36.11301, -115.1193049)
('Ved7660-FWXXEob6v2qCWw', 'DfgZlNgKwBvCpA_0alumXw', 'DfgZlNgKwBvCpA_0alumXw', 3.30387931034483, 1.34654383043478, 'Hotels', 36.121686, -115.175733)
('Ved7660-FWXXEob6v2qCWw', 'sNVGdeOPeitJ3OWUQBINzQ', 'sNVGdeOPeitJ3OWUQBINzQ', 4.09336250911743, 1.12763870554493, 'Breakfast & Brunch', 36.1136354358475, -115.261437935378)
('Ved7660-FWXXEob6v2qCWw', 'AV6weBrZFFBfRGCbcRGO4g', 'AV6weBrZFFBfRGCbcRGO4g', 2.61391509433962, 1.30045984294862, 'Hotels', 36.0973406760397, -115.17633942712)


### Utilizando nossa API para realizar consulta

In [32]:
user_id = widgets.Text()
display(user_id)

Text(value='')

In [34]:
med_map,dp_map = mapping(user_id.value)

In [35]:
med_map

Figure(layout=FigureLayout(height='420px'))

In [36]:
dp_map

Figure(layout=FigureLayout(height='420px'))