In [2]:
import polars as pl
import os

In [3]:
def read_files(path):
    files_in_path = os.listdir(path)
    try:
        path_competitor = (
            path
            + "/"
            + [
                file
                for file in files_in_path
                if "concorrente" in file or "competitor" in file
            ][0]
        )
        path_content = (
            path
            + "/"
            + [
                file
                for file in files_in_path
                if "conteudo" in file or "content" in file or "conteúdo" in file
            ][0]
        )
        path_followers = (
            path
            + "/"
            + [
                file
                for file in files_in_path
                if "seguidor" in file or "followers" in file
            ][0]
        )
        path_visitors = (
            path
            + "/"
            + [
                file
                for file in files_in_path
                if "visitante" in file or "visitors" in file
            ][0]
        )
    except Exception as e:
        print("Erro ao encontrar arquivos de entrada. Verifique o diretório.")
        print(str(e))
        exit()

    sheets = {
        "content_metrics": {
            "path": path_content,
            "sheet": 1,
            "skip_rows": 0,
            "super_header": True,
        },
        "content_posts": {
            "path": path_content,
            "sheet": 2,
            "super_header": True,
        },
        "followers_new": {"path": path_followers, "sheet": 1},
        "followers_location": {"path": path_followers, "sheet": 2},
        "followers_function": {"path": path_followers, "sheet": 3},
        "followers_experience": {"path": path_followers, "sheet": 4},
        "followers_industry": {"path": path_followers, "sheet": 5},
        "followers_company_size": {"path": path_followers, "sheet": 6},
        "visitors_metrics": {"path": path_visitors, "sheet": 1},
        "visitors_location": {"path": path_visitors, "sheet": 2},
        "visitors_function": {"path": path_visitors, "sheet": 3},
        "visitors_experience": {"path": path_visitors, "sheet": 4},
        "visitors_industry": {"path": path_visitors, "sheet": 5},
        "visitors_company_size": {"path": path_visitors, "sheet": 6},
        "competitors": {"path": path_competitor, "skip_rows": 1},
    }

    for sheet_name, sheet in sheets.items():
        if "soujunior_extração_conteúdo_01_23-01_24" in sheet["path"]:
            sheet["df"] = pl.read_excel(
                source=sheet["path"],
                sheet_id=sheet.get("sheet", 1),
                read_options={"skip_rows": 1},
                infer_schema_length=10000,
                # schema_overrides={"Data": pl.Date},
            )
            continue

        elif "soujunior_extração_concorrentes_01_23-01_24" in sheet["path"]:
            sheet["df"] = pl.read_csv(
                source=sheet["path"], infer_schema_length=10000, skip_rows=1
            )

        elif ".xls" in sheet["path"]:
            sheet["df"] = pl.read_excel(
                source=sheet["path"],
                sheet_id=sheet.get("sheet", 1),
                read_options={"skip_rows": sheet.get("skip_rows", 0)},
                # schema_overrides={"Data": pl.Date},
            )

        if "super_header" in sheet:
            first_row = sheet["df"].row(0)
            sheet["df"].columns = first_row
            sheet["df"] = sheet["df"].slice(1, sheet["df"].height)

        if sheet_name == "competitors":
            sheet["df"] = sheet["df"].select(sheet["df"].columns[:5])

    return sheets

In [4]:
def clear_and_transform(sheets):
    for sheet_name, sheet in sheets.items():
        sheet["df"] = model_data(sheet["df"], category=sheet_name)

    sheets, final_date = process_content_metrics(sheets)

    sheets = process_aggregate_date_dataframes(sheets, final_date)

    return sheets


def get_last_date(df):
    return df["Date"].max().date()


def model_data(df, category):
    """
    Traduz colunas e infere tipos
    """
    category_atributes = {
        "content_metrics": {
            "Date": pl.String,  # temp
            "Impressions (organic)": pl.Int64,
            "Impressions (sponsored)": pl.Int64,
            "Impressions (total)": pl.Int64,
            "Unique impressions (organic)": pl.Int64,
            "Clicks (organic)": pl.Int64,
            "Clicks (sponsored)": pl.Int64,
            "Clicks (total)": pl.Int64,
            "Reactions (organic)": pl.Int64,
            "Reactions (sponsored)": pl.Int64,
            "Reactions (total)": pl.Int64,
            "Comments (organic)": pl.Int64,
            "Comments (sponsored)": pl.Int64,
            "Comments (total)": pl.Int64,
            "Shares (organic)": pl.Int64,
            "Shares (sponsored)": pl.Int64,
            "Shares (total)": pl.Int64,
            "Engagement rate (organic)": pl.Float64,
            "Engagement rate (sponsored)": pl.Float64,
            "Engagement rate (total)": pl.Float64,
        },
        "content_posts": {
            "Post Title": pl.String,
            "Post Link": pl.String,
            "Post Type": pl.String,
            "Campaign Name": pl.String,
            "Published by": pl.String,
            "Date": pl.String,  # temp
            "Campaign Start Date": pl.String,  # temp
            "Campaign End Date": pl.String,  # temp
            "Audience": pl.String,
            "Impressions": pl.Int64,
            "Views (excluding off-site video views)": pl.Int64,
            "Off-site Views": pl.Int64,
            "Clicks": pl.Int64,
            "Click-Through Rate (CTR)": pl.Float32,
            "Likes": pl.Int64,
            "Comments": pl.Int64,
            "Shares": pl.Int64,
            "Followers": pl.Int64,
            "Engagement Rate": pl.Float32,
            "Content Type": pl.String,
        },
        "followers_new": {
            "Date": pl.String,  # temp
            "Followers Sponsored": pl.Int64,
            "Followers Organic": pl.Int64,
            "Total Followers": pl.Int64,
        },
        "followers_location": {"Location": pl.String, "Total Followers": pl.Int64},
        "followers_function": {"Function": pl.String, "Total Followers": pl.Int64},
        "followers_experience": {
            "Experience Level": pl.String,
            "Total Followers": pl.Int64,
        },
        "followers_industry": {"Industry": pl.String, "Total Followers": pl.Int64},
        "followers_company_size": {
            "Company Size": pl.String,
            "Total Followers": pl.Int64,
        },
        "visitors_metrics": {
            "Date": pl.String,  # temp
            "Page Views Overview (Desktop)": pl.Int64,
            "Page Views Overview (Mobile Devices)": pl.Int64,
            "Page Views Overview (Total)": pl.Int64,
            "Unique Visitors Overview (Desktop)": pl.Int64,
            "Unique Visitors Overview (Mobile Devices)": pl.Int64,
            "Unique Visitors Overview (Total)": pl.Int64,
            "Page Views Day by Day (Desktop)": pl.Int64,
            "Page Views Day by Day (Mobile Devices)": pl.Int64,
            "Page Views Day by Day (Total)": pl.Int64,
            "Unique Visitors Day by Day (Desktop)": pl.Int64,
            "Unique Visitors Day by Day (Mobile Devices)": pl.Int64,
            "Unique Visitors Day by Day (Total)": pl.Int64,
            "Page Views Jobs (Desktop)": pl.Int64,
            "Page Views Jobs (Mobile Devices)": pl.Int64,
            "Page Views Jobs (Total)": pl.Int64,
            "Unique Visitors Jobs (Desktop)": pl.Int64,
            "Unique Visitors Jobs (Mobile Devices)": pl.Int64,
            "Unique Visitors Jobs (Total)": pl.Int64,
            "Total Page Views (Desktop)": pl.Int64,
            "Total Page Views (Mobile Devices)": pl.Int64,
            "Total Page Views (Total)": pl.Int64,
            "Total Unique Visitors (Desktop)": pl.Int64,
            "Total Unique Visitors (Mobile Devices)": pl.Int64,
            "Total Unique Visitors (Total)": pl.Int64,
        },
        "visitors_location": {"Location": pl.String, "Total Views": pl.Int64},
        "visitors_function": {"Function": pl.String, "Total Views": pl.Int64},
        "visitors_experience": {"Experience Level": pl.String, "Total Views": pl.Int64},
        "visitors_industry": {"Industry": pl.String, "Total Views": pl.Int64},
        "visitors_company_size": {"Company Size": pl.String, "Total Views": pl.Int64},
        "competitors": {
            "Page": pl.String,
            "Total Followers": pl.Int64,
            "New Followers": pl.Int64,
            "Total Post Engagements": pl.Float32,
            "Total Posts": pl.Int64,
        },
    }
    category_atribute = category_atributes.get(category)

    translated_columns = list(category_atribute.keys())

    df.columns = translated_columns

    df = df.cast(category_atribute)

    date_columns = {
        "content_metrics": ["Date"],
        "content_posts": ["Date", "Campaign Start Date", "Campaign End Date"],
        "followers_new": ["Date"],
        "visitors_metrics": ["Date"],
    }



    if category in date_columns.keys():
        for col in date_columns[category]:

            df = df.with_columns(pl.col(col).str.to_date("%m/%d/%Y"))

    return df


def process_content_metrics(sheets):

    df = sheets["content_metrics"]["df"]

    df = df.with_columns(
        pl.when(pl.col("Reactions (total)") >= 0)
        .then(pl.col("Reactions (total)"))
        .otherwise(pl.lit(0))
        .alias("Reactions (positive)"),
        pl.when(pl.col("Comments (total)") >= 0)
        .then(pl.col("Comments (total)"))
        .otherwise(pl.lit(0))
        .alias("Comments (positive)"),
        pl.when(pl.col("Shares (total)") >= 0)
        .then(pl.col("Shares (total)"))
        .otherwise(pl.lit(0))
        .alias("Shares (positive)"),
        pl.when(pl.col("Clicks (total)") >= 0)
        .then(pl.col("Clicks (total)"))
        .otherwise(pl.lit(0))
        .alias("Clicks (positive)"),
    )

    df = df.with_columns(
        (pl.col("Reactions (positive)"))
        .rolling_mean(window_size=3)
        .alias("Reactions (moving average)"),
        (pl.col("Comments (positive)"))
        .rolling_mean(window_size=3)
        .alias("Comments (moving average)"),
        (pl.col("Shares (positive)"))
        .rolling_mean(window_size=3)
        .alias("Shares (moving average)"),
        (pl.col("Clicks (positive)"))
        .rolling_mean(window_size=3)
        .alias("Clicks (moving average)"),
    )

    df = df.with_columns(
        pl.when(pl.col("Reactions (total)") >= 0)
        .then(pl.col("Reactions (total)"))
        .otherwise("Reactions (moving average)")
        .alias("Reactions (final)"),
        pl.when(pl.col("Comments (total)") >= 0)
        .then(pl.col("Comments (total)"))
        .otherwise("Comments (moving average)")
        .alias("Comments (final)"),
        pl.when(pl.col("Shares (total)") >= 0)
        .then(pl.col("Shares (total)"))
        .otherwise("Shares (moving average)")
        .alias("Shares (final)"),
        pl.when(pl.col("Clicks (total)") >= 0)
        .then(pl.col("Clicks (total)"))
        .otherwise("Clicks (moving average)")
        .alias("Clicks (final)"),
    )

    engagement_sum = (
        pl.col("Reactions (final)")
        + pl.col("Comments (final)")
        + pl.col("Clicks (final)")
        + pl.col("Shares (final)")
    )

    df = df.with_columns(
        (engagement_sum / pl.col("Impressions (total)")).alias(
            "Engagement rate (calculed)"
        )
    )

    df_final = df.select(
        [
            "Date",
            "Impressions (total)",
            "Reactions (final)",
            "Comments (final)",
            "Clicks (final)",
            "Shares (final)",
            "Engagement rate (calculed)",
        ]
    )
    df_final.columns = [
        "Date",
        "Impressions (total)",
        "Reactions (total)",
        "Comments (total)",
        "Clicks (total)",
        "Shares (total)",
        "Engagement rate (total)",
    ]
    final_date = df_final.select(pl.col("Date").max()).to_series()[0]

    sheets["content_metrics"]["df"] = df_final

    return sheets, final_date


def process_aggregate_date_dataframes(sheets, final_date):

    aggregate_date_dataframes = [
        "competitors",
        "followers_company_size",
        "followers_experience",
        "followers_function",
        "followers_industry",
        "followers_location",
        "visitors_company_size",
        "visitors_experience",
        "visitors_function",
        "visitors_industry",
        "visitors_location",
    ]

    for sheet_name, sheet in sheets.items():
        if sheet_name in aggregate_date_dataframes:
            sheet["df"] = sheet["df"].with_columns(
                pl.lit(final_date, allow_object=True).alias("Date")
            )

    return sheets

In [5]:
def merge_dfs(df1, df2, category):
    key = {
        "content_metrics": "Date",
        "content_posts": "Post Link",
        "followers_new": "Date",
        "followers_company_size": "Company Size",
        "followers_experience": "Experience Level",
        "followers_function": "Function",
        "followers_industry": "Industry",
        "followers_location": "Location",
        "visitors_metrics": "Date",
        "visitors_company_size": "Company Size",
        "visitors_experience": "Experience Level",
        "visitors_function": "Function",
        "visitors_industry": "Industry",
        "visitors_location": "Location",
        "competitors": "Page",

    }
    df_merged = df2.merge_sorted(df1, key=key.get(category))
    return df_merged

In [6]:
def merge(extractions):
    final_dfs = {}
    for sheet_name, sheet in extractions[0].items():
        i = 1
        merged_df = sheet["df"]
        while i < len(extractions):
            if sheet_name in extractions[i].keys():
                merged_df = merge_dfs(merged_df, extractions[i][sheet_name]["df"], category=sheet_name)
            i+=1
        
        final_dfs[sheet_name] = merged_df
            
    return final_dfs

In [9]:
def export_dfs(final_dfs):
    
    output_path = "processed/365d/"
    for sheet_name, sheet in final_dfs.items():
        sheet.write_csv(output_path + sheet_name + ".csv", quote_style="always")


In [10]:
# path = "raw/365d/04_23-04_24"
folders = os.listdir("raw/365d")
paths = ["raw/365d/" + folder for folder in folders]

extractions = []
for path in paths:
    print("Processing:", path)
    sheets = read_files(path)

    final_sheets = clear_and_transform(sheets)
    extractions.append(final_sheets)

final_dfs = merge(extractions)

export_dfs(final_dfs)

final_dfs

Processing: raw/365d/01_23-01_24
Processing: raw/365d/03_23-03_24
Processing: raw/365d/04_23-04_24


{'content_metrics': shape: (1_092, 7)
 ┌────────────┬─────────────┬────────────────────┬──────────┬─────────┬─────────┬───────────────────┐
 │ Date       ┆ Impressions ┆ Reactions (total)  ┆ Comments ┆ Clicks  ┆ Shares  ┆ Engagement rate   │
 │ ---        ┆ (total)     ┆ ---                ┆ (total)  ┆ (total) ┆ (total) ┆ (total)           │
 │ date       ┆ ---         ┆ f64                ┆ ---      ┆ ---     ┆ ---     ┆ ---               │
 │            ┆ i64         ┆                    ┆ f64      ┆ f64     ┆ f64     ┆ f64               │
 ╞════════════╪═════════════╪════════════════════╪══════════╪═════════╪═════════╪═══════════════════╡
 │ 2023-01-17 ┆ 7335        ┆ 134.0              ┆ 10.0     ┆ 291.0   ┆ 5.0     ┆ 0.059986          │
 │ 2023-01-18 ┆ 4436        ┆ 48.0               ┆ 4.0      ┆ 165.0   ┆ 0.0     ┆ 0.048918          │
 │ 2023-01-19 ┆ 8411        ┆ 41.0               ┆ 6.0      ┆ 391.0   ┆ 4.0     ┆ 0.05255           │
 │ 2023-01-20 ┆ 13630       ┆ 109.0         

In [None]:
def concatenate(self, dfs, path_export):
    if len(os.listdir(path_export)) > 0:
        print("Clean data detected! Concatenating...")
        for df in dfs:
            df_clean = pd.read_csv(
                path_export + "\\" + df["name"] + ".csv", parse_dates=["Date"]
            )

            if df["name"] == "content_metrics":
                df["df"] = self.concat_dfs(df_clean, df["df"], drop_duplicates="Date")
            elif df["name"] == "content_posts":
                df["df"] = self.concat_dfs(
                    df_clean, df["df"], drop_duplicates="Post Link"
                )
            elif df["name"] == "followers_new":
                df["df"] = self.concat_dfs(df_clean, df["df"], drop_duplicates="Date")
            elif df["name"] == "visitors_metrics":
                df["df"] = self.concat_dfs(df_clean, df["df"], drop_duplicates="Date")
            else:
                df["df"] = self.concat_dfs(df_clean, df["df"])
    else:
        print("No data to concatenate! Continuing...")

    return dfs


def concat_dfs(
    self,
    df1,
    df2,
    drop_duplicates=False,
):
    df1["Date"] = pd.to_datetime(df1["Date"])
    df2["Date"] = pd.to_datetime(df2["Date"])
    df = pd.concat([df1, df2])

    if drop_duplicates:
        df = df.drop_duplicates(subset=[drop_duplicates])

    df["Date"] = pd.to_datetime(df["Date"]).dt.date
    df = df.sort_values(by="Date", ascending=False)

    return df