In [11]:
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DateType,DecimalType
from pyspark.sql.functions import udf, current_date,  to_date, date_format, lit, col
from pyspark.sql.types import BinaryType
import uuid
import datetime

import pandas as pd


In [3]:
connection_url = None
user = None
password = None
directory = None
bronze_directory = None
silver_directory = None
start_pattern = None
end_extension = None
notebook_name = None

In [4]:
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_202'
os.environ['PATH'] = os.environ['JAVA_HOME'] + r'\bin;' + os.environ['PATH']
driver_path = "C:/ocDB/WINDOWS.X64_193000_db_home/jdbc/lib/ojdbc8.jar"
if connection_url is None:
    connection_url = "jdbc:oracle:thin:@//localhost:1521/orcl"
if user is None:
    user = "sys as SYSDBA"
if password is None:
    password = "root"
if directory is None:
    directory = "Volumes/dev/tiintegracao/team/cartoes/cext"
if bronze_directory is None:
    bronze_directory = "Volumes/bronze/"
if silver_directory is None:
    silver_directory = "Volumes/silver/"
if start_pattern is None:
    start_pattern = "CEXT_756"
if end_extension is None:
    end_extension = ".CCB"
if notebook_name is None:
    notebook_name = "TestePyspark.ipynb"

In [5]:
spark = SparkSession.builder \
    .appName("Exemplo de Spark JDBC com Oracle") \
    .config("spark.driver.extraClassPath", driver_path) \
    .getOrCreate()

In [6]:
def add_uuid(df,col_name):
    
    def uuid_bytes():
        return uuid.uuid4().bytes

    
    udf_uuid_bytes = udf(uuid_bytes, BinaryType())

    
    df_with_uuid = df.withColumn(col_name, udf_uuid_bytes())

    return df_with_uuid

In [15]:
def make_log(status, message):
    log_file= bronze_directory+"log.csv"
    new_log = pd.DataFrame({
        'Status': [status],
        'Date': [datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')],
        'Notebook': [notebook_name],
        'Message': [message]
    })
    
    if os.path.exists(log_file):
        log_df = pd.read_csv(log_file)
        log_df = pd.concat([log_df, new_log], ignore_index=True)
    else:
        log_df = new_log
    
    log_df.to_csv(log_file, index=False)
    schema = StructType([
        StructField("status", StringType(), True),
        StructField("notebook", StringType(), True),
        StructField("message", StringType(), True),
        StructField("dhlog", DateType(), True)
    ])
    df_log = spark.createDataFrame([(status, notebook_name, message, datetime.datetime.now().now().date())], schema)
    df_log = add_uuid(df_log,'id')
    df_log.show()
    df_log.write.jdbc(
                url=connection_url,
                table="BRONZE.log_table",
                mode="append",
                properties={"user": user, "password": password, "driver": "oracle.jdbc.driver.OracleDriver"}
            )

In [6]:
def append_to_csv(df, csv_path):
    csv_path =  silver_directory + csv_path
    if os.path.exists(csv_path):
        
        existing_df = pd.read_csv(csv_path)
        df = pd.concat([existing_df, df], ignore_index=True)
    
    df.to_csv(csv_path, index=False)

In [8]:
def queryData(query):
    df = spark.read.format("jdbc") \
        .option("url", connection_url) \
        .option("driver", "oracle.jdbc.OracleDriver") \
        .option("query", query) \
        .option("user", user) \
        .option("password", password) \
        .load()
    return df

In [9]:
def process_file(filepath,file_name):
    rows = []
    with open(filepath, 'r') as file:
        lines = file.readlines()
        line_count = 1
        for line in lines[1:-1]: 
            card_number = line[6:25].strip() 
            date = line[34:42].strip()
            rows.append({ 
                          "card_number": card_number,
                          "date": date,
                          "line_content":line,
                          "file_name":file_name,
                          "line_number":line_count})
            line_count+=1
    return rows

In [10]:

def filter_files(directory, start_pattern, end_extension):
    filtered_files = []
    already_read_files = set()
    readed_files_directory = bronze_directory+"readed_cxt"
    if os.path.exists(readed_files_directory):
        with open(readed_files_directory, 'r') as file:
            already_read_files = set(line.strip() for line in file.readlines())

    new_files = []

    for filename in os.listdir(directory):
        if filename.startswith(start_pattern) and filename.endswith(end_extension):
            filepath = os.path.join(directory, filename)
            if filename in already_read_files:
                continue
            
            with open(filepath, 'r') as file:
                lines = file.readlines()
                if lines[0].startswith("CEXT0") and lines[-1].startswith("CEXT9"):
                    filtered_files.append(filename)
                    new_files.append(filename)

    if new_files:
        with open(readed_files_directory, 'a') as file:
            for filename in new_files:
                file.write(filename + '\n')
    
    return filtered_files
    
    return filtered_files

In [11]:
try:
    files_list = filter_files(directory, start_pattern, end_extension)
except Exception as e:
    make_log('error', str(e))

In [12]:
display(files_list)

['CEXT_7562011_20240125_0002504.CCB']

In [13]:
try:
    all_rows = []
    for file in files_list:
        all_rows.extend(process_file(directory+'/'+file,file))

    with open("data.json", "w") as f:
        json.dump(all_rows, f)
except Exception as e:
    make_log('error', str(e))

In [14]:
try:
    df_transactions = None
    if all_rows:
        df_transactions = spark.createDataFrame(all_rows)
        df_transactions = df_transactions.withColumn("card_number", df_transactions["card_number"].cast(DecimalType(16, 0)))
        
except Exception as e:
    make_log('error', str(e))

In [15]:
df_transactions.show() if df_transactions is not None else print("df_transactions is None: ",df_transactions)

+----------------+--------+--------------------+--------------------+-----------+
|     card_number|    date|           file_name|        line_content|line_number|
+----------------+--------+--------------------+--------------------+-----------+
|5151070044381239|20230612|CEXT_7562011_2024...|75600051510700443...|          1|
|5151070044381239|20230612|CEXT_7562011_2024...|75600051510700443...|          2|
|5151070044381239|20230612|CEXT_7562011_2024...|75600051510700443...|          3|
|5151070432093151|20230612|CEXT_7562011_2024...|75600051510704320...|          4|
|5151070432093151|20230612|CEXT_7562011_2024...|75600051510704320...|          5|
|5151070432093151|20230612|CEXT_7562011_2024...|75600051510704320...|          6|
|5151070432093151|20230612|CEXT_7562011_2024...|75600051510704320...|          7|
|5151940230409696|20230612|CEXT_7562011_2024...|75600051519402304...|          8|
|5151940230409696|20230612|CEXT_7562011_2024...|75600051519402304...|          9|
|515107004438701

In [16]:
try:    
        if df_transactions is not None:
                cards_number = df_transactions.select(col("card_number").cast("string")).distinct().rdd.flatMap(lambda x: x).collect()
                df_cards = queryData('SELECT * FROM CARTOES.TB_CARTAO WHERE NRCARTAO IN ({})'.format(', '.join(cards_number)))
                df_control = queryData("""
                        SELECT IDARQUIVO_CONTROLE 
                        FROM CARTOES.TB_ARQUIVO_CONTROLE 
                        WHERE NRARQUIVO = 2
                        """)
except Exception as e:
    print(e)
    make_log('error', str(e))

In [17]:
try:
    if df_transactions is not None:
        df_join = df_transactions.join(df_cards, df_transactions["card_number"] == df_cards["NRCARTAO"], "inner")
        df_transactions = df_join.select("date", "card_number","file_name","line_content","line_number")
        del df_join
except Exception as e:
    make_log('error', str(e))

In [18]:
try:
    if df_transactions is not None:
        df_transactions
        df_transactions = df_transactions.withColumn('date_now', current_date()) \
                                        .withColumn("date", to_date(df_transactions["date"], "yyyyMMdd"))
        df_transactions = df_transactions.withColumn("date", date_format(df_transactions["date"], "dd/MM/yyyy")) \
                                        .withColumn("date_now", date_format(df_transactions["date_now"], "dd/MM/yyyy"))

        df_transactions = df_transactions.withColumn('IDARQUIVO', lit(uuid.uuid4().bytes))
        df_transactions = df_transactions.withColumn('CDSITUACAO', lit(1))

        df_transactions = add_uuid(df_transactions,"IDARQUIVO_LINHA")
except Exception as e:
    make_log('error', str(e))

In [19]:
df_transactions.show() if df_transactions is not None else print("df_transactions is None: ",df_transactions)

+----------+----------------+--------------------+--------------------+-----------+----------+--------------------+----------+--------------------+
|      date|     card_number|           file_name|        line_content|line_number|  date_now|           IDARQUIVO|CDSITUACAO|     IDARQUIVO_LINHA|
+----------+----------------+--------------------+--------------------+-----------+----------+--------------------+----------+--------------------+
|12/06/2023|5151070044381239|CEXT_7562011_2024...|75600051510700443...|          1|10/06/2024|[86 9E 12 59 19 E...|         1|[CC 1C 76 80 38 C...|
|12/06/2023|5151070044381239|CEXT_7562011_2024...|75600051510700443...|          2|10/06/2024|[86 9E 12 59 19 E...|         1|[C3 A8 20 16 1E D...|
|12/06/2023|5151070044381239|CEXT_7562011_2024...|75600051510700443...|          3|10/06/2024|[86 9E 12 59 19 E...|         1|[BE 35 E2 14 AF A...|
|12/06/2023|5151070044387015|CEXT_7562011_2024...|75600051510700443...|         10|10/06/2024|[86 9E 12 59 19 E.

In [20]:
try:
    df_archive = None
    df_archive_line = None
    if df_transactions is not None:
        df_archive = df_transactions.withColumnRenamed('file_name', 'NMARQUIVO') \
                                    .withColumnRenamed('date', 'DTARQUIVO') \
                                    .withColumnRenamed('date_now', 'DHREGISTRO') \
                                    .drop("card_number") \
                                    .drop("line_content") \
                                    .drop("IDARQUIVO_LINHA") \
                                    .drop("line_number") \
                                    .drop("CDSITUACAO")
                                    
        df_archive_line = df_transactions.drop("card_number")\
                                        .drop("file_name")\
                                        .withColumnRenamed('line_content', 'DSCONTEUDO') \
                                        .withColumnRenamed('date', 'DTPROCESSO') \
                                        .withColumnRenamed('line_number', 'NRLINHA') \
                                        .withColumnRenamed('date_now', 'DHREGISTRO')
except Exception as e:
    make_log('error', str(e))

In [21]:
df_archive_line.show() if df_transactions is not None else print("df_archive_line is None: ",df_archive_line)

+----------+--------------------+-------+----------+--------------------+----------+--------------------+
|DTPROCESSO|          DSCONTEUDO|NRLINHA|DHREGISTRO|           IDARQUIVO|CDSITUACAO|     IDARQUIVO_LINHA|
+----------+--------------------+-------+----------+--------------------+----------+--------------------+
|12/06/2023|75600051510700443...|      1|10/06/2024|[86 9E 12 59 19 E...|         1|[E5 D8 17 05 EE 6...|
|12/06/2023|75600051510700443...|      2|10/06/2024|[86 9E 12 59 19 E...|         1|[B0 61 11 37 67 6...|
|12/06/2023|75600051510700443...|      3|10/06/2024|[86 9E 12 59 19 E...|         1|[87 5F B0 78 91 0...|
|12/06/2023|75600051510700443...|     10|10/06/2024|[86 9E 12 59 19 E...|         1|[72 48 EC A5 B8 B...|
+----------+--------------------+-------+----------+--------------------+----------+--------------------+



In [22]:
try:
    if df_archive is not None:
        df_archive = df_archive.crossJoin(df_control)
        df_archive = df_archive.distinct()
except Exception as e:
    make_log('error', str(e))


In [23]:

df_archive.show() if df_transactions is not None else print("df_archive is None: ",df_archive)

+----------+--------------------+----------+--------------------+--------------------+
| DTARQUIVO|           NMARQUIVO|DHREGISTRO|           IDARQUIVO|  IDARQUIVO_CONTROLE|
+----------+--------------------+----------+--------------------+--------------------+
|12/06/2023|CEXT_7562011_2024...|10/06/2024|[86 9E 12 59 19 E...|[FF 95 CE C4 CE 7...|
+----------+--------------------+----------+--------------------+--------------------+



In [24]:
try:
    if df_archive is not None:
        df_archive.write.jdbc(
                url=connection_url,
                table="CARTOES.TB_ARQUIVO",
                mode="append",
                properties={"user": user, "password": password, "driver": "oracle.jdbc.driver.OracleDriver"}
            )
        df_pandas = df_archive.toPandas()
        append_to_csv(df_pandas, "CARTOES/TB_ARQUIVO.csv")
except Exception as e:
    print(e)
    make_log('error', str(e))

In [25]:
try:
    if df_archive_line is not None:
        df_archive_line.write.jdbc(
                url=connection_url,
                table="CARTOES.TB_ARQUIVO_LINHA",
                mode="append",
                properties={"user": user, "password": password, "driver": "oracle.jdbc.driver.OracleDriver"}
            )
        df_pandas = df_archive_line.toPandas()
        append_to_csv(df_pandas, "CARTOES/TB_ARQUIVO_LINHA.csv")
except Exception as e:
    make_log('error', str(e))

In [16]:
make_log("success", "notebook ran successfully")

+-------+------------------+--------------------+----------+--------------------+
| status|          notebook|             message|     dhlog|                  id|
+-------+------------------+--------------------+----------+--------------------+
|success|TestePyspark.ipynb|notebook ran succ...|2024-06-10|[0C D6 84 7E 28 8...|
+-------+------------------+--------------------+----------+--------------------+



NameError: name 'dhlog' is not defined