In [None]:

def get_tables(file_name):
    '''
    This function can be used to download the tables that lives in S3 that are proccessed by the ETL workflow
    '''
    fs = s3fs.S3FileSystem() 
    source = f"s3://ift-bucket-jafet/historical/{file_name}.parquet"
    return pl.scan_parquet(source)



In [1]:
import polars as pl
from io import StringIO
import requests
import s3fs

'''
If you're not using the s3 and you only want to test, please run this cel
'''
def download_files(table_name: str) -> pl.DataFrame:
    url = f"https://bit.ift.org.mx/descargas/datos/tabs/{table_name}.csv"
    try:
        response = requests.get(url)
        response.raise_for_status()
        
    except requests.exceptions.RequestException as e:
        print(f"Error inesperado: {e}")
    
    response.encoding = 'utf-8'
    csv_string = StringIO(response.text)
    
    return pl.read_csv(csv_string,infer_schema_length=10000)
    
def tweak_df(df: pl.DataFrame, columns_transformations: list, schema: dict) -> pl.DataFrame:
  # aplicación de funciones para el df
  # acepta lista de expresiones
  return df.with_columns(columns_transformations).cast(schema)

def dict_to_df(dict):
    return pl.from_dict(dict)

def upload_df(df: pl.DataFrame, bucket_name: str, file_name: str):
    fs = s3fs.S3FileSystem()
    destination = f"s3://{bucket_name}/{file_name}.parquet"
    try:
        with fs.open(destination, mode = 'wb') as f:
            df.write_parquet(f)
    except Exception as e:
        print(f"Error inesperado: {e}")

diccionario_datos = {
    "TD_LINEAS_INTMOVIL_ITE_VA":{
        "schema":{
            "FECHA": pl.Date,
            "ANIO": pl.Int16,
            "MES": pl.Int8,
            # "K_GRUPO": pl.Categorical,
            #"GRUPO": pl.Categorical,
            #"K_EMPRESA": pl.Categorical,
            #"EMPRESA": pl.Categorical,
            #"CONCESIONARIO": pl.Categorical,
            "L_PREPAGO_E": pl.Int32,
            "L_POSPAGO_E": pl.Int32,
            "L_POSPAGOC_E": pl.Int32,
            "L_POSPAGOL_E": pl.Int32,
            "L_NO_ESPECIFICADO_E": pl.Int64,
            "L_TOTAL_E": pl.Int32,
            #"FOLIO": pl.Categorical
          },
         "tweak_columns":[
            #pl.col( "L_PREPAGO_E" ).str.replace_all(",","").str.to_integer(base=10),
            #pl.col( "L_POSPAGO_E" ).str.replace_all(",","").str.to_integer(base=10),
            #pl.col( "L_POSPAGOC_E" ).str.replace_all(",","").str.to_integer(base=10),
            #pl.col( "L_POSPAGOL_E" ).str.replace_all(",","").str.to_integer(base=10),
            #pl.col( "L_NO_ESPECIFICADO_E" ).str.replace_all(",","").str.to_integer(base=10),
            #pl.col( "L_TOTAL_E" ).str.replace_all(",","").str.to_integer(base=10),
            pl.col("FECHA").str.to_date("%d/%m/%Y"),
            pl.col("FOLIO").cast(pl.Int32)
          ]
    },
    "TD_TRAF_INTMOVIL_ITE_VA":{
        "schema":{
            "ANIO":pl.Int16,
            "MES":pl.Int8,
            "FECHA":pl.Date,
            #"FOLIO":pl.Categorical,
            ## "K_GRUPO":pl.Categorical,
            #"GRUPO":pl.Categorical,
            #"K_EMPRESA":pl.Categorical,
            #"EMPRESA":pl.Categorical,
            #"CONCESIONARIO":pl.Categorical,
            "TRAF_TB_2G_E":pl.Float32,
            "TRAF_TB_3G_E":pl.Float32,
            "TRAF_TB_4G_E":pl.Float32,
            "TRAF_TB_NO_ESPECIFICADO_E":pl.Float32,
            "TOTAL_TB_E":pl.Float32
        },
        "tweak_columns":[
            #pl.col('TRAF_TB_2G_E').cast(pl.Float32),
            #pl.col('TRAF_TB_3G_E').str.replace_all(",","").cast(pl.Float32),
            #pl.col('TRAF_TB_4G_E').str.replace_all(",","").cast(pl.Float32),
            #pl.col('TRAF_TB_NO_ESPECIFICADO_E').str.replace_all(",","").cast(pl.Float32),
            #pl.col('TOTAL_TB_E').str.replace_all(",","").cast(pl.Float32),
            pl.col('FOLIO').cast(pl.Int32),
            pl.col("FECHA").str.to_date("%d/%m/%Y")
            #pl.coalesce(pl.col('FECHA').str.to_date(format="%d%b%Y",strict=False),pl.col('FECHA').str.to_date(format="%d-%b-%y",strict=False))
            ]
    },
    "TD_IHH_INTMOVIL_ITE_VA":{
        "schema":{
            "ANIO":pl.Int16,
            "MES":pl.Int8,
            "IHH_INTMOVIL_E":pl.Int16
        },
        "tweak_columns": [
            pl.col('IHH_INTMOVIL_E').str.replace_all(",",""),
            pl.col("FECHA").str.to_date("%d/%m/%Y")
        ]
    },
    "TD_MARKET_SHARE_INTMOVIL_ITE_VA":{
        "schema": {
            "ANIO":pl.Int16,
            "MES":pl.Int8,
            # "K_GRUPO":pl.Categorical,
            #"GRUPO":pl.Categorical
        },
        "tweak_columns":[
            pl.col('MARKET_SHARE').str.replace_all("%","").cast(pl.Float32),
            pl.col('FECHA').str.to_date(format="%d/%m/%Y")
        ]

    }
}

def tablas(name_tabla):
    name = name_tabla
    tabla= download_files(name)
    tabla_tw = tweak_df(tabla,diccionario_datos[name]['tweak_columns'],diccionario_datos[name]['schema'])
    return tabla_tw

lineas = tablas('TD_LINEAS_INTMOVIL_ITE_VA')
traf = tablas('TD_TRAF_INTMOVIL_ITE_VA')
ihh = tablas('TD_IHH_INTMOVIL_ITE_VA')
market_share = tablas('TD_MARKET_SHARE_INTMOVIL_ITE_VA')

In [4]:

#creating the table that will use for the model 

tot_market_share = market_share.filter(pl.col('MARKET_SHARE')>=1.0).group_by('K_GRUPO').agg(pl.col('MARKET_SHARE').sum())
maket_share_filtered_top8 = market_share.select('FECHA','K_GRUPO','MARKET_SHARE').filter(pl.col('MARKET_SHARE')>=1.0)
lineas_filtered = lineas.select('FECHA','K_GRUPO','GRUPO','K_EMPRESA','EMPRESA','CONCESIONARIO','L_PREPAGO_E','L_POSPAGO_E','L_POSPAGOC_E','L_POSPAGOL_E','L_NO_ESPECIFICADO_E','L_TOTAL_E')
lineas_filtered_with_top8 = lineas_filtered.filter(pl.col('K_GRUPO').is_in(tot_market_share.select('K_GRUPO')))
traf_filtered = traf.select('FECHA','K_GRUPO','GRUPO','K_EMPRESA','EMPRESA','CONCESIONARIO','TRAF_TB_2G_E','TRAF_TB_3G_E','TRAF_TB_4G_E','TRAF_TB_NO_ESPECIFICADO_E','TOTAL_TB_E')
traf_filtered_with_top8 = traf_filtered.filter(pl.col('K_GRUPO').is_in(tot_market_share.select('K_GRUPO')))

first_join = maket_share_filtered_top8.join(traf_filtered_with_top8, on=['FECHA','K_GRUPO'], how='left')
second_join = first_join.join(lineas_filtered_with_top8, on=['FECHA','K_GRUPO'], how='left')

sj_g003 = second_join.filter(pl.col('K_GRUPO')=='G003')

In [7]:
##This is only a preview of how to work with mlflow.

from sklearn.linear_model import LinearRegression
from sklearn.feature_extraction import DictVectorizer
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import mlflow
from sklearn.pipeline import make_pipeline

mlflow.set_tracking_uri("http://localhost:5000")

def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

categorical = []
numerical = ['L_NO_ESPECIFICADO_E','L_POSPAGOC_E','L_PREPAGO_E','L_TOTAL_E']
target = ['MARKET_SHARE']

df = sj_g003.select(numerical+categorical+target).with_columns(pl.col('L_NO_ESPECIFICADO_E').fill_null(0),pl.col('L_POSPAGOC_E').fill_null(0),pl.col('L_PREPAGO_E').fill_null(0),pl.col('L_TOTAL_E').fill_null(0))

mlflow.sklearn.autolog()

    #separacion train test 80-20
train,test = train_test_split(df)
train_dicts = train.select(numerical+categorical).to_dicts()
test_dicts = test.select(numerical+categorical).to_dicts()
y_train = train.select(target).to_numpy()
y_test = test.select(target).to_numpy()



In [8]:
with mlflow.start_run():

    pipeline = make_pipeline(
        DictVectorizer(),
        LinearRegression()
    )
    pipeline.fit(train_dicts,y_train)

    y_pred = pipeline.predict(test_dicts)
    (rmse,mae,r2) = eval_metrics(y_test,y_pred)

2024/08/15 13:05:25 INFO mlflow.tracking._tracking_service.client: 🏃 View run luminous-pug-984 at: http://localhost:5000/#/experiments/0/runs/eba233b9ace241f99e07c3198dcf86db.
2024/08/15 13:05:25 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://localhost:5000/#/experiments/0.
