In [1]:
# %load_ext pycodestyle_magic

In [2]:
# %pycodestyle_on

In [3]:
# Import modules

import naas
import naas_drivers
import json
from datetime import datetime, timedelta, date
import time
import pandas as pd
import openpyxl
import numpy as np
from pymongo import ASCENDING, DESCENDING
import os.path
from os import path
import plotly.express as px
import sys

In [4]:
# Set variables

# -> Repository
INPUT_FOLDER = os.path.join('..', 'input')
OUTPUT_FOLDER = os.path.join('..', 'output')

if not path.exists(INPUT_FOLDER):
    os.mkdir(INPUT_FOLDER)

if not path.exists(OUTPUT_FOLDER):
    os.mkdir(OUTPUT_FOLDER)

# -> MongoDB
MONGO_DB = None
USE_MONGO = True
USE_HEALTHCHECKS = True
DB_SRC = 'covid-19'
DB_APP = 'app-wsr'

# -> Date
NOW = datetime.now().strftime('%d/%m/%Y %H:%M:%S')
TODAY = date.today()
YESTERDAY = (TODAY - timedelta(days=1))
B_YESTERDAY = (TODAY - timedelta(days=2))

# -> Other
LOGO_CS = "https://bob.cashstory.com/assets/images/cashstory_all_white.png"

In [5]:
# Connect to MongoDB

def connect_mongo():
    # Init variables
    mongo_conf = None
    mongo_db = None

    # Load conf
    mongo_credentials = naas.secret.get("mongo_secret")

    # Bob connect
    naas_drivers.mongo.connect(mongo_credentials)
    mongo_db = naas_drivers.mongo.get_client()
    return mongo_db


if USE_MONGO:
    MONGO_DB = connect_mongo()

Successfully connected to MongoDB


In [6]:
# Init REF WSR
REF_WSR_NAME = 'REF_WSR'
REF_WSR_PATH = os.path.join(INPUT_FOLDER, f'{REF_WSR_NAME}.xlsx')
REF_FRANCE_NAME = 'REF_FRANCE'


# Load REF_FRANCE
def load_ref_france():
    df = pd.read_excel(REF_WSR_PATH, sheet_name=REF_FRANCE_NAME)
    df['DEP_CODE'] = df['DEP_CODE'].astype(str)
    df['DEP_NAME'] = df['DEP_NAME'].astype(str)
    df['DEP_FULL'] = df['DEP_CODE'] + ' - ' + df['DEP_NAME']
    return df


REF_FRANCE = load_ref_france()

In [7]:
# Map chart
# -> Load Basemap
def load_map(file_name):
    json_file = os.path.join(INPUT_FOLDER, f'{file_name}.json')
    with open(json_file) as f:
        cs_map = json.load(f)
    return cs_map


# US - 500K
MAP_STATES = load_map('usa_states')

# US - 20m
MAP_STATES_20M = load_map('usa_states_20m')

# US - 5m
MAP_STATES_5M = load_map('usa_states_5m')

# France - Region
MAP_FRANCE_REG = load_map('france-region')

# France - Region
MAP_FRANCE_DEP = load_map('france-departement')

# World map
WORLDMAP = load_map('world-med')


# -> Function
def analysis(df, variable):
    Min = df[variable].min()
    Max = df[variable].max()
    Avr = round(df[variable].mean(), 0)
    Med = round(np.median(df[variable]), 0)
    AvrMax = round((Avr / Max), 2)
    MedMax = round((Med / Max), 2)
    to_print = (f'{variable} : Min: {Min}, Max: {Max},'
                f'Average: {Avr}, AvrMax: {AvrMax},'
                f'Med: {Med}, MedMax: {MedMax}')
    return print(to_print)


# Fonction to add CSS
def updateChartCss(filename, css_filename):
    map = None
    css = None
    with open(filename) as f:
        map = f.read()

    with open(css_filename) as f:
        css = f.read()
    if (map.find('id="cs_css"') != -1):
        print("to do")
    else:
        result = map.replace("<body>",
                             f'<body><style id="cs_css">{css}</style>')
        with open(filename, "w") as f:
            f.write(result)
            f.close()

In [8]:
# -- Datasource : Check if url exists and return df
# default seperator = ','
def check_url(url, separator=','):
    df = pd.DataFrame()
    try:
        df = pd.read_csv(url, sep=separator)
    except Exception as e:
        print(f'Error connecting to {url}.')
        print(e.__doc__)
        print(str(e))
    return df

In [9]:
# -- Datasource local : open csv in local if path exists or open DB MONGO
def get_datasource(src_name):
    src_path = os.path.join(OUTPUT_FOLDER, f'{src_name}.csv')
    if path.exists(src_path):
        df = pd.read_csv(src_path, sep=';', low_memory=False)
        if len(df) > 0 and 'Unnamed: 0' in df.columns:
            df = df.drop('Unnamed: 0', axis=1)
    else:
        df = bob.mongo.read_df(src_name, DB_SRC)
        if len(df) > 0:
            df = df.drop('_id', axis=1)
        else:
            print(f'Source {src_name} does not exist !')
    return df

In [10]:
# -- Trend : Create date scenario
# col scenario default : 'DATE_SCENARIO'
def get_lastdays(df, limit, title, col='DATE_SCENARIO'):
    df_b = pd.DataFrame()
    if 'DATE_ORDER' in df.columns:
        all_dates = df['DATE_ORDER'].sort_values(ascending=False)
        all_dates = all_dates.drop_duplicates().values.tolist()[:limit]
    for d in all_dates:
        tmp_df = df[df['DATE_ORDER'] == d]
        df_b = df_b.append(tmp_df)
        df_b[col] = title
    return df_b

In [11]:
def df_save(df, file_name, file_type):
    if file_type == 'csv':
        file_path = os.path.join(OUTPUT_FOLDER, f'{file_name}.csv')
        df.to_csv(file_path, sep=';')
        print(f'{file_name} successfully saved in {file_name}')

In [None]:
from typing import List


def optimize_floats(df: pd.DataFrame) -> pd.DataFrame:
    floats = df.select_dtypes(include=['float64']).columns.tolist()
    df[floats] = df[floats].apply(pd.to_numeric, downcast='float')
    return df


def optimize_ints(df: pd.DataFrame) -> pd.DataFrame:
    ints = df.select_dtypes(include=['int64']).columns.tolist()
    df[ints] = df[ints].apply(pd.to_numeric, downcast='integer')
    return df


def optimize_objects(df: pd.DataFrame, datetime_features: List[str]= []) -> pd.DataFrame:
    for col in df.select_dtypes(include=['object']):
        if col not in datetime_features:
            num_unique_values = len(df[col].unique())
            num_total_values = len(df[col])
            if float(num_unique_values) / num_total_values < 0.5:
                df[col] = df[col].astype('category')
        else:
            df[col] = pd.to_datetime(df[col])
    return df


# Function to optimise DF size
def optimize(df: pd.DataFrame, datetime_features: List[str] = []):
    return optimize_floats(optimize_ints(optimize_objects(df, datetime_features)))

In [12]:
# import xlsxwriter 

# def add_format_cs(writer,sheetname,df,start_row):
#     # Get the xlsxwriter workbook and worksheet objects.
#     workbook  = writer.book
#     worksheet = writer.sheets[sheetname]
    
#     # Hide gridlines
#     worksheet.hide_gridlines(2)

#     # Add a header format.
#     header_format = workbook.add_format({
#         'bold': True,
#         'text_wrap': False,
#         'valign': 'center',
#         'align': 'center',
#         'font_name': 'Arial', 
#         'font_size': '10',
#         'font_color': '#ffffff',
#         'fg_color': '#4caf50',
#         'border': 1,
#         'border_color': '#808080'})

#     # Write the column headers with the defined format.
#     for col_num, value in enumerate(df.columns.values):
#         worksheet.write(start_row, col_num, value, header_format)
        
#     # Auto-size colums length
#     #Iterate through each column and set the width == the max length in that column. A padding length of 2 is also added.
#     for i, col in enumerate(df.columns):
#         # find length of column i
#         column_len = df[col].astype(str).str.len().max()
#         # Setting the length if the column header is larger
#         # than the max column value length
#         column_len = max(column_len, len(col)) + 2
#         # set the column length
#         worksheet.set_column(i, i, column_len)
        
#     # Add CS Logo
#     worksheet.insert_image('A1', LOGO_CS, {'x_scale': 0.1, 'y_scale': 0.1})