# Use databases and dataviz tools to empower analysis

In this notebook we will get data from MinIO bucket, insert it into a database table and visualize outputs on an interface

### 1.1.0 Get data from MinIO

With the last notebook, create a minio client, get your parquet file and read it with pandas

In [17]:
# import dependancies
from minio import Minio
import urllib3
import pandas as pd
import pyarrow.parquet as pq
from io import BytesIO, StringIO


In [3]:
## Create a client with the access key and the secret key given
client = Minio(
    "storage-api.course.aiengineer.sandbox-atos.com",
    access_key="tempTP1",
    secret_key="tempTP1pw",
    secure=True,
    http_client=urllib3.PoolManager(
        
        retries=urllib3.Retry(
            total=5,
            backoff_factor=0.2,
            status_forcelist=[500, 502, 503, 504],
        ),
    ),
)

In [10]:
### path to the object into minIO
path_minio='datasets/chicago/trips.parquet'

### Reuse your bucket name
bucket='john-doe'

In [20]:
# Get data from minio using get_object, decode it using BytesIO and read the parquet result with pandas
try:
    response = client.get_object(bucket, path_minio)
    # Read data from response.
    parquet_object=BytesIO(response.data)
    data = pd.read_parquet(parquet_object)
finally:
    response.close()
    response.release_conn()

In [19]:
data.head()

Unnamed: 0,tips,trip_start_timestamp,trip_seconds,trip_miles,pickup_community_area,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area,fare,tolls,extras,trip_total
0,0.0,2019-01-31T23:45:00.000,746.0,3.34,6.0,41.944227,-87.655998,16.0,11.75,0.0,0.0,11.75
1,0.0,2019-01-31T23:45:00.000,681.0,3.0,8.0,41.899602,-87.633308,24.0,11.25,0.0,0.0,11.25
2,11.0,2019-01-31T23:45:00.000,2280.0,15.2,56.0,41.792592,-87.769615,22.0,39.0,0.0,5.0,55.0
3,2.0,2019-01-31T23:45:00.000,360.0,1.2,8.0,41.899602,-87.633308,32.0,6.5,0.0,0.0,8.5
4,8.4,2019-01-31T23:45:00.000,1500.0,11.5,76.0,41.980264,-87.913625,4.0,29.75,0.0,4.0,42.15


### 1.1.1 Clickhouse create db and tables

Here we want to store our numerical features for futur analysis / model training / preprocessing.

In [21]:
# import depandancies
import pandahouse as ph

#### Create clickhouse connection

In [22]:
## The connection dict need a default database
connection = dict(database='default',
                  host='http://clickhouse-install.clickhouse.svc.cluster.local:8123',
                  user='admin',
                  password='B1gdata-demo')

In [48]:
ph.read_clickhouse('show databases',connection=connection)

Unnamed: 0,name
0,INFORMATION_SCHEMA
1,default
2,guillaumedb
3,information_schema
4,system


In [32]:
### helper function for handle this python client
def write_clickhouse(query,connection):
    print(query)
    try:
        ph.read_clickhouse(query,connection=connection)
    except KeyError:
        print("Nothing to return")

**Create a db named firstname-lastname, as in your credentials**

In [None]:
# firstname_lastname, as in your credentials but with "_" instead of "-" because clickhouse does not allow "-" in db name
dbname = 'john_doe'

In [27]:
### create a personal database
write_clickhouse(f"create database {dbname}",connection)

Operation succeded


In [30]:
### override connection dict with personal database
connection['database'] = f"{dbname}"

In [34]:
connection

{'database': 'guillaumedb',
 'host': 'http://clickhouse-install.clickhouse.svc.cluster.local:8123',
 'user': 'admin',
 'password': 'B1gdata-demo'}

#### Create clickhouse table taxi_trips

In [None]:
dbtable='chicago_taxi'

In [None]:
### select features
features = data[[
    "tips",
    "trip_start_timestamp",
    "trip_seconds",
    "trip_miles",
    "pickup_community_area" ,
    "dropoff_community_area" ,
    "fare",
    "tolls",
    "extras",
    "trip_total"
]]

In [43]:
### create table for inserting taxi trip dataset 
## Clickhouse table definition
# using the df informations, and clickhouse documentation write  the create table statement
taxitable = f"""
CREATE TABLE {dbname}.{dbtable}
(
    `tips` Float32,
    `trip_start_timestamp` DateTime,
    `trip_seconds` Float32,
    `trip_miles` Float32,
    `pickup_community_area` Float32,
    `dropoff_community_area` Float32,
    `fare` Float32,
    `tolls` Float32,
    `extras` Float32,
    `trip_total` Float32
) 
ENGINE = MergeTree
PARTITION BY toYYYYMM(trip_start_timestamp)
ORDER BY trip_start_timestamp;
"""

In [None]:
write_clickhouse(taxitable,connection)

#### Insert the data into taxi_trips table

In [45]:
## We have to be compliant with the clickhouse date type. 
## we need to force '%Y-%m-%d %H:%M:%S'
## force the date format with the defined schema, using pandas
features["trip_start_timestamp"] = pd.to_datetime(data["trip_start_timestamp"]).dt.strftime('%Y-%m-%d %H:%M:%S')

In [46]:
### insert using the to_clickhouse function
ph.to_clickhouse(features, dbtable, index=False, chunksize=100000, connection=connection)

10000

### 1.1.2 Postgresql Create db and table

Here we want to store a referential of pickup community area and related long / lat. To feed future map analysis.

In [None]:
# import depandancies : psycopg2-binary
import psycopg2


In [None]:
# From the data we create a de-deduplicated, non null value referential
scope = data[[
    "pickup_community_area",
    "pickup_centroid_latitude",
    "pickup_centroid_longitude"
    ]].drop_duplicates().dropna()

In [None]:
# verify that the length of the referential is coherent
len(scope)

In [None]:
# head some lines of the scoped data
scope.head()

#### Create postgres connection

In [None]:
# postgres is the default database, autocommit enable folder level actions
conn = psycopg2.connect(
   database="postgres", user='postgres', password='B1gdata-demo', host='mypostgres.kubegres.svc.cluster.local', port= '5432'
)
conn.autocommit = True
cursor = conn.cursor()

#### Create postgres personal DB

In [None]:
## Define the postgres database name
# firstname_lastname, as in your credentials but with "_" instead of "-" because postgres does not allow "-" in db name
dbname = 'john_doe'

In [None]:
# write the query to create a new database named with the dbname var
sqlCreateDb = f""" create database {dbname}"""
# execute the query using the cursor
cursor.execute(sqlCreateDb)

In [None]:
## Now overwrite the conn with your personnal DB
conn = psycopg2.connect(
   database=dbname, user='postgres', password='B1gdata-demo', host='mypostgres.kubegres.svc.cluster.local', port= '5432'
)
conn.autocommit = True
cursor = conn.cursor()

#### Create table in postgres

In [None]:
### set table name
table_name='chicago_areas'

In [None]:
### Find the right way to define the table using postgresql documentation
# focus on schema and types
# using the df informations, and postgres documentation write the create table statement

areas_table=f"""
CREATE TABLE IF NOT EXISTS {table_name} (
   pickup_community_area double precision,
   pickup_centroid_latitude double precision,
   pickup_centroid_longitude double precision )
"""

In [None]:
## execute the table creation query
cursor.execute(areas_table)

#### Insert data into table

In [None]:


def insert_df_to_table(df,table,conn,cursor):
    """
    insert data to postgres table from pandas dataframe
    """
    # prepare object to stream data
    output = StringIO()
    # put data into StringIO object as a csv 
    df.to_csv(output, sep='\t', header=False, index=False)
    # replace output cursor position  position 0
    output.seek(0)
    # copy content from stream object to table
    cursor.copy_from(output, table, null="") # null values become ''
    conn.commit()

In [None]:

insert_df_to_table(scope,table_name,conn,cursor)

#### Verify if content is loaded properly

In [None]:
# define a select statement to get 5 top records of your areas table
selectexp = f" select * from {table_name} limit 5 "

In [None]:
# query the base and return a Pandas dataframe using read_sql_query function from pandas
frame = pd.read_sql_query(selectexp,conn)

In [None]:
# Check the 5 rows you select
frame.head()

### 1.1.3 Use kafka brokers and topics to send your data event by event

In [None]:
# import dependancies
# !pip install kafka-python
from kafka import KafkaProducer

In [None]:
### Create a Kafka object called producer, that produce messages to a topic
producer = KafkaProducer(
    bootstrap_servers=[
        "clusterka-kafka-bootstrap.kafka.svc.cluster.local"
        ], 
        value_serializer= lambda x: x.encode('utf-8'))


In [None]:
### persist Data locally to allow streaming
data.to_csv("./chicagodata/to_be_sent_into_kafka.csv")

#### Create the topic via kafka admin

In [None]:
### The topic will be define like your table name
topicName = dbname 

In [None]:
# import dependancies
from kafka.admin import KafkaAdminClient, NewTopic

In [None]:
### Create the admin client
admin_client = KafkaAdminClient(
    bootstrap_servers=[
        "clusterka-kafka-bootstrap.kafka.svc.cluster.local"
        ])

In [None]:
### Create an empty list, create a topic with the minimum configuration, add it to the list and call "create_topic" with this list
topic_list = []
topic_list.append(NewTopic(name=topicName, num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)

### Before executing the next cell, open and execute the last notebook : [2_receive_stream_data.ipynb](./2_receive_stream_data.ipynb)

In [None]:
### read the local file, loop on it and send events with the producer using kafka-python documentation
with open("./chicagodata/to_be_sent_into_kafka.csv") as f:
    for i, line in enumerate(f):
        if i > 0:
            producer.send(topic, line)
producer.flush()

### 1.1.4 Visualize on superset

Go to [https://dataviz.course.aiengineer.sandbox-atos.com/](https://dataviz.course.aiengineer.sandbox-atos.com/) and log with your account

on Data > Databases you should see a database named `clickhouse`. This will be our source

![source](./images/source.png)

With this source we will create a superset dataset. It maps a table and allow exploration/ chart creation using it

![dataset](./images/dataset.png)

![table](./images/table.png)

One you choose the dataset, click on it and start create some charts

![tips](./images/tips.png)

**In this example**

- Chart type is bar chart
- No time range because dataset has old dates values
- metric is average tips (y)
- serie is pickup location (x)

You can name, save and assign chart to a dashboard.


**Go further : Create a dashboard with multiple vizualisation answering to feature analysis, try to represent the dataset on a map**