In [1]:
import pandas as pd
import pendulum
import pytz
import requests
from datetime import datetime, timedelta, time
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from google.cloud import bigquery



In [2]:
_google_key = "../../Credenciales/servacc_bigquery.json"
bq_table = "conexion-datos-rdf.audio_digital.acumulado_diario"

cl = pytz.timezone("America/Santiago")
local_tz = pendulum.timezone("America/Santiago")
utc = pytz.utc
fmt= '%Y-%m-%d %H:%M:%S'
fmt_d= '%Y-%m-%d'
dias_reemplazo = 10
now_dt = datetime.now(cl)
#now_dt = datetime(2022,7,1)

content_types = {
    "horizonte.cl": [None],
    "oasisfm.cl": [None],
    "playfm.cl": [None],
    "sonarfm.cl": [None],
    "tele13radio.cl": [None]
}

grupo = {
    "horizonte.cl": "3570670d5064d58ee8ccdd3650506e3a",
    "oasisfm.cl": "35bfeb595ed83ebec22c1e3b5ed28f7f",
    "playfm.cl": "06c2d59cd238c5848572bc1874acb044",
    "sonarfm.cl": "2757a2c258842237e149c98e55073b31",
    "tele13radio.cl": "5ae0b4ae44d45dbbaef992ea99512079"
}

content_live = {
    "horizonte.cl": "601415b58308405b0d11c82a",
    "oasisfm.cl": "5c915497c6fd7c085b29169d",
    "playfm.cl": "5c8d6406f98fbf269f57c82c",
    "sonarfm.cl": "5c915724519bce27671c4d15",
    "tele13radio.cl": "5c915613519bce27671c4caa"
}

stream_vip = {
     5: "b3583f38d358a82ecb3b6783664f1305"
}





In [3]:
def sesion_platform():

    login_platform = {"username": "jlizana@rdfmedia.cl",
                      "password": "jlizana123", "withJWT": "true", "login": "Login"}
    session = requests.session()
    r = session.post("https://platform.mediastre.am/login",
                     data=login_platform)
    r = session.get('https://platform.mediastre.am/analytics/now')
    token = r.cookies.get_dict()["jwt"]
    headers = {"X-API-Token": token}
    endpoint = "https://metrics.mdstrm.com/outbound/v1/metric/api"

    return headers, endpoint



In [4]:
def del_current_data():

    f_inicio_consulta = (now_dt + timedelta(days= - dias_reemplazo)).date().strftime(fmt_d)

    client = bigquery.Client.from_service_account_json(_google_key) 

    update_job_d = client.query(
        """
        DELETE FROM `{0}`
        WHERE fecha_termino >= '{1}'
        """.format(bq_table, f_inicio_consulta))  
    
    print(f'Borrando periodo diario desde: ' + f_inicio_consulta)       
    print(update_job_d.result())

In [5]:
def gen_array_dias() -> list: 

    """Generación de la lista de los rangos de días a descargar desde la API de Mediastream

    Returns:
        list: lista de rangos de días a consultar 
    """

     # Se determinan las fechas de inicio y término de la consulta
    dt_termino = datetime.now(cl)
    fe_inicio = (dt_termino + timedelta(days=-dias_reemplazo)).date()
    dt_inicio = datetime.combine(fe_inicio, time.min).astimezone(cl)

    # Se genera un rango de fechas entre ambas
    rango_dias = list(pd.date_range(dt_inicio, dt_termino, freq="1D"))

    rangos = []

    for dia in rango_dias:
        if dia.day== 1:
            f_inicio = (dia + timedelta(days=-1)).replace(day=1)
            f_inicio = datetime.combine(f_inicio.date(), time.min).astimezone(utc).strftime(fmt)
            f_fin = dia.astimezone(utc).strftime(fmt)
        else:
            f_inicio = dia.replace(day=1)
            f_inicio = datetime.combine(f_inicio.date(), time.min).astimezone(utc).strftime(fmt)
            f_fin = dia.astimezone(utc).strftime(fmt)

        rangos.append([f_inicio, f_fin])

    # Se devuelve el listado de rangos a consultar
    return rangos

gen_array_dias()



[['2022-07-01 04:00:00', '2022-07-29 04:00:00'],
 ['2022-07-01 04:00:00', '2022-07-30 04:00:00'],
 ['2022-07-01 04:00:00', '2022-07-31 04:00:00'],
 ['2022-07-01 04:00:00', '2022-08-01 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-02 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-03 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-04 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-05 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-06 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-07 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-08 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-09 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-10 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-11 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-12 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-13 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-14 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-15 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-16 04:00:00'],
 ['2022-08-01 04:00:00', '2022-08-17 04:00:00'],
 ['2022-08-01 04:00:

In [41]:
def generar_req(fechas: list, soporte: str) -> dict:

    req = {}
    req["name"] = "full_by_time"
    req["dimension"] = ["content_live"]

    req["filter"] = [
        {"name": "date", "op": [">=", "<="], "value":fechas},
        {"name": "group", "value": [grupo[soporte]],
            "logic_op": "and", "order": 1}
    ]

    req["filter"].append(
        {"name": "content_id", "value": [content_live[soporte]], 
        "logic_op": "and", "order": 2})
   
    req["filter"].append(
        {"name": "group", "value": [stream_vip[5]], 
        "logic_op": "and", "order": 3})
        
    req["calendar"] = {"type": "all"}
    req["time"] = "0"
    req["trunc"] = ["MONTH"]

    return req



In [42]:
def response_to_dataframe(response, soporte, vip):

    df = pd.DataFrame(response.json()["data"])

    if df.empty:
        return df
   
    df["soporte"] = soporte
    df["vip"] = "v{0}".format(vip)

    live_filter = {
        "horizonte.cl": "Horizonte",
        "oasisfm.cl": "Oasis FM",
        "playfm.cl": "Play FM",
        "sonarfm.cl": "Sonar FM",
        "tele13radio.cl": "Tele 13 Radio"
    }

    df_2 = df[df["content_type"].isin(["OnDemand", "OPE"])].copy()
    if soporte != "emisorpodcasting.cl":
        df_2 = df[df["content_live"].isin([None, live_filter[soporte]])].copy()

    return df_2[["soporte", "content_type", "vip",
                "stream", "device"]]




In [43]:
def descargar_data(soporte, fechas):
    headers, endpoint = sesion_platform()

    consultas = content_types[soporte]
    print("Descargando datos: " + soporte)
    df_soporte = pd.DataFrame(columns=["soporte"])

    for content_type, vip in list(itertools.product(consultas, stream_vip.keys())):
        fechas_con = fechas.copy()

        print("{0}, {1}, {2}, vip {3}".format(
            fechas_con, soporte, content_type, vip))

        req = generar_req(fechas_con, soporte)
        response = requests.post(endpoint, headers=headers, json=req)

        df = response_to_dataframe(response, soporte, vip)

        if df.empty:
            return df

        df["fecha_inicio"]= pd.to_datetime(fechas[0]).tz_localize(utc).tz_convert(cl).date()
        df["fecha_termino"]= pd.to_datetime(fechas[1]).tz_localize(utc).tz_convert(cl).date()

        df_soporte = pd.concat([df_soporte, df])

    df_soporte.sort_values(
        by=["soporte", "fecha_inicio","fecha_termino","content_type"], inplace=True)
    df_soporte=df_soporte[["soporte", "fecha_inicio", "fecha_termino", "content_type", "vip", "stream", "device"]]
    df_soporte.reset_index(drop=True, inplace=True)

    return df_soporte



In [44]:
def upload_to_bq(df):

    client = bigquery.Client.from_service_account_json(_google_key)

    job_config = bigquery.LoadJobConfig(
        schema=[],
        clustering_fields=["soporte", "content_type", "vip"],
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.MONTH,
            field="fecha_inicio",
        )
    )

    job = client.load_table_from_dataframe(df, bq_table, job_config=job_config)

    print(job.result()) 



In [45]:
def etl_horizonte():
        
    rango= gen_array_dias()
    df_total= pd.DataFrame()
    
    for fecha in rango:
        df=descargar_data("horizonte.cl", fecha)
        df_total= pd.concat([df_total, df])
    upload_to_bq(df_total)

def etl_oasis():
        
    rango= gen_array_dias()
    df_total= pd.DataFrame()
    
    for fecha in rango:
        df=descargar_data("oasisfm.cl", fecha)
        df_total= pd.concat([df_total, df])
    upload_to_bq(df_total)

def etl_play():
        
    rango= gen_array_dias()
    df_total= pd.DataFrame()
    
    for fecha in rango:
        df=descargar_data("playfm.cl", fecha)
        df_total= pd.concat([df_total, df])
    upload_to_bq(df_total)

def etl_sonar():
        
    rango= gen_array_dias()
    df_total= pd.DataFrame()
    
    for fecha in rango:
        df=descargar_data("sonarfm.cl", fecha)
        df_total= pd.concat([df_total, df])
    upload_to_bq(df_total)

def etl_tele13radio():
        
    rango= gen_array_dias()
    df_total= pd.DataFrame()
    
    for fecha in rango:
        df=descargar_data("tele13radio.cl", fecha)
        df_total= pd.concat([df_total, df])
    upload_to_bq(df_total)

In [52]:
#del_current_data()
#etl_horizonte()
#etl_oasis()
#etl_play()
#etl_sonar()
#etl_tele13radio()

Descargando datos: tele13radio.cl
['2022-07-01 04:00:00', '2022-07-28 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-07-01 04:00:00', '2022-07-29 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-07-01 04:00:00', '2022-07-30 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-07-01 04:00:00', '2022-07-31 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-07-01 04:00:00', '2022-08-01 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-08-01 04:00:00', '2022-08-02 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-08-01 04:00:00', '2022-08-03 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-08-01 04:00:00', '2022-08-04 04:00:00'], tele13radio.cl, None, vip 5
Descargando datos: tele13radio.cl
['2022-08-01 04:00:00', '2022-08-05 04:00:00'], tele13radio.cl, None, vip 5
Descargand