In [213]:
from typing import List
import os

from pyspark.sql import DataFrame
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql.functions import col

from pyspark.sql import SparkSession, SQLContext

In [5]:
# Função de Transformação do trex-dags


def transform_columns_to_timestamp(
    df: DataFrame, timestamp_fields: List[str]
) -> DataFrame:
    for timestamp_field in timestamp_fields:
        df = df.withColumn(f"{timestamp_field}_ts", to_timestamp(timestamp_field))
        df = df.drop(timestamp_field)
        df = df.withColumnRenamed(f"{timestamp_field}_ts", timestamp_field)

    return df

In [34]:
MAX_MEMORY = "5g"

spark = (
    SparkSession.builder.appName("Pagar.me Data Cluster")
    .config("spark.executor.memory", MAX_MEMORY)
    .config("spark.driver.memory", MAX_MEMORY)
    .getOrCreate()
)

In [9]:
input_path = os.path.join(".", "pagarme")
output_path = os.path.join(".", "output", "processed_files")

if os.path.isdir(output_path):
    os.rmdir(output_path)

timestamp_fields = [
    "created_at",
    "updated_at",
    "local_time",
    #     "pix_expiration_date"
]

In [10]:
df = spark.read.parquet(input_path)

In [11]:
# Show columns before transforming in to datetime
[(i, v) for i, v in df.dtypes if i in timestamp_fields]

[('created_at', 'string'), ('updated_at', 'string'), ('local_time', 'string')]

In [12]:
transformed_df = transform_columns_to_timestamp(df, timestamp_fields)

In [13]:
transformed_df.count()

                                                                                

213439422

In [19]:
df_acima = (
    transformed_df.filter(transformed_df.boleto_expiration_date.isNotNull())
    .filter(col("boleto_expiration_date") > "2030-01-01 00:00:00")
    .select(
        "id", "boleto_expiration_date", "created_at", "payment_method", "company_id"
    )
    .sort(col("boleto_expiration_date").desc(), col("created_at").desc())
)
#     .show(100, truncate=False)

In [20]:
df_acima.count()

                                                                                

1657

In [21]:
df_abaixo = (
    transformed_df.filter(transformed_df.boleto_expiration_date.isNotNull())
    .filter(col("boleto_expiration_date") < "2010-01-01 00:00:00")
    .select(
        "id", "boleto_expiration_date", "created_at", "payment_method", "company_id"
    )
    .sort(col("boleto_expiration_date").desc(), col("created_at").desc())
)
#     .show(100, truncate=False)

In [22]:
df_abaixo.count()

                                                                                

1285

In [24]:
df_final = df_acima.union(df_abaixo)

In [25]:
df_final.count()

                                                                                

2942

In [26]:
df_final.toPandas().to_csv("result.csv")

                                                                                

In [175]:
from datetime import timedelta, datetime

limit_sup = datetime.now() + timedelta(days=3650)
limit_inf = "2010-01-01 00:00:00"

limit_sup = str(limit_sup)
limit_sup

'2031-08-30 17:20:27.271568'

In [176]:
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import udf

# maturity_udf = udf(lambda age: "adult" if age >=18 else "child", StringType())


age_udf = udf(lambda x: None if x > limit_sup or x < limit_inf else x, StringType())

In [177]:
df = spark.createDataFrame(
    [
        {"boleto_expiration_date": "0001-01-01 03:06:28.000000"},
        {"boleto_expiration_date": "8062-06-30 03:00:00.000000"},
        {"boleto_expiration_date": "1900-06-30 03:00:00.000000"},
        {"boleto_expiration_date": "2021-09-27 03:00:00.000000"},
        {"boleto_expiration_date": "0000-00-00 00:00:00.000000"},
    ]
)
df.show()

+----------------------+
|boleto_expiration_date|
+----------------------+
|  0001-01-01 03:06:...|
|  8062-06-30 03:00:...|
|  1900-06-30 03:00:...|
|  2021-09-27 03:00:...|
|  0000-00-00 00:00:...|
+----------------------+



In [158]:
df2 = df.withColumn("boleto_expiration_date", age_udf(df.boleto_expiration_date))
df2.show()

+----------------------+
|boleto_expiration_date|
+----------------------+
|                  null|
|                  null|
|                  null|
|  2021-09-27 03:00:...|
|                  null|
+----------------------+



In [152]:
df_gg = transform_columns_to_timestamp(df2, ["boleto_expiration_date"])
df_gg.show()

+----------------------+
|boleto_expiration_date|
+----------------------+
|                  null|
|                  null|
|                  null|
|   2021-09-27 03:00:00|
|                  null|
+----------------------+



In [178]:
TIMESTAM_MAX = str(datetime.now() + timedelta(days=3650))
TIMESTAM_MIN = "2010-01-01 00:00:00"

In [179]:
def clean_timestamp_columns(
    df: DataFrame,
    timestamp_fields: List[str],
) -> DataFrame:
    #     log.info(f"Cleaning timestamp columns {timestamp_fields}")
    timestamp_validation_udf = udf(
        lambda x: None if x > TIMESTAM_MAX or x < TIMESTAM_MIN else x, StringType()
    )

    for timestamp_field in timestamp_fields:
        df = df.withColumn(
            timestamp_field, timestamp_validation_udf(df[timestamp_field])
        )

    return df

In [184]:
df.show()

+----------------------+
|boleto_expiration_date|
+----------------------+
|  0001-01-01 03:06:...|
|  8062-06-30 03:00:...|
|  1900-06-30 03:00:...|
|  2021-09-27 03:00:...|
|  0000-00-00 00:00:...|
+----------------------+



In [180]:
dfhh = clean_timestamp_columns(df, ["boleto_expiration_date"])

In [181]:
dfhh.show()

+----------------------+
|boleto_expiration_date|
+----------------------+
|                  null|
|                  null|
|                  null|
|  2021-09-27 03:00:...|
|                  null|
+----------------------+



In [199]:
df = spark.createDataFrame(
    [
        {
            "boleto_expiration_date": "0001-01-01 03:06:28.000000",
            "created_at": "2021-09-01 03:06:28.000000",
        },
        {
            "boleto_expiration_date": "8062-06-30 03:00:00.000000",
            "created_at": "2021-09-01 03:06:28.000000",
        },
        {
            "boleto_expiration_date": "1900-06-30 03:00:00.000000",
            "created_at": "2021-09-01 03:06:28.000000",
        },
        {
            "boleto_expiration_date": "2021-09-27 03:00:00.000000",
            "created_at": "2021-09-01 03:06:28.000000",
        },
        {
            "boleto_expiration_date": "0000-00-00 00:00:00.000000",
            "created_at": "2021-09-01 03:06:28.000000",
        },
    ]
)
df.show()

+----------------------+--------------------+
|boleto_expiration_date|          created_at|
+----------------------+--------------------+
|  0001-01-01 03:06:...|2021-09-01 03:06:...|
|  8062-06-30 03:00:...|2021-09-01 03:06:...|
|  1900-06-30 03:00:...|2021-09-01 03:06:...|
|  2021-09-27 03:00:...|2021-09-01 03:06:...|
|  0000-00-00 00:00:...|2021-09-01 03:06:...|
+----------------------+--------------------+



In [218]:
convert = lambda string: datetime.strptime(
    "2021-09-01 03:06:28.00", "%Y-%m-%d %H:%M:%S.%f"
)

created_at_datetime = convert("2021-09-01 03:06:28.00")
created_at_datetime

datetime.datetime(2021, 9, 1, 3, 6, 28)

In [227]:
from datetime import datetime, timedelta


@udf(returnType=StringType())
def clean_boleto(created_at, boleto_field):
    created_at_datetime = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S.%f")

    limit_sup_date = str(created_at_datetime + timedelta(days=3_650))

    condition_boleto_after = created_at > boleto_field
    condition_boleto_exceed_limit = boleto_field > limit_sup_date

    if condition_boleto_after or condition_boleto_exceed_limit:
        return None

    return boleto_field


def clean_boleto_columns(
    df: DataFrame,
    boleto_fields: List[str],
) -> DataFrame:

    for boleto_field in boleto_fields:
        df = df.withColumn(
            boleto_field,
            clean_boleto(
                df["created_at"],
                df[boleto_field],
            ),
        )

    return df

In [228]:
clean_boleto_columns(df, ["boleto_expiration_date"]).show()

+----------------------+--------------------+
|boleto_expiration_date|          created_at|
+----------------------+--------------------+
|                  null|2021-09-01 03:06:...|
|                  null|2021-09-01 03:06:...|
|                  null|2021-09-01 03:06:...|
|  2021-09-27 03:00:...|2021-09-01 03:06:...|
|                  null|2021-09-01 03:06:...|
+----------------------+--------------------+



In [229]:
df.show()

+----------------------+--------------------+
|boleto_expiration_date|          created_at|
+----------------------+--------------------+
|  0001-01-01 03:06:...|2021-09-01 03:06:...|
|  8062-06-30 03:00:...|2021-09-01 03:06:...|
|  1900-06-30 03:00:...|2021-09-01 03:06:...|
|  2021-09-27 03:00:...|2021-09-01 03:06:...|
|  0000-00-00 00:00:...|2021-09-01 03:06:...|
+----------------------+--------------------+

