In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=0539661da01cf55064876d425269a672a146c2076c3fe05ed67ce4536d72bcca
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

from pyspark.sql.functions import lit

from functools import reduce
from pyspark.sql import functions as F

In [None]:
import uuid
import hashlib
import time
import re
import numpy as np
import pandas as pd
import os
from google.cloud import storage
from google.oauth2 import service_account

from google.cloud import bigquery
import chardet

from google.colab import auth
auth.authenticate_user()

### **Funciones globales**

In [None]:
def download_and_save_credentials():
    """
    Descarga un archivo desde Google Cloud Storage y lo guarda localmente.

    Returns:
    - str: La ruta del archivo guardado localmente.
    """
    # Variables
    bucket_name = 'credentials-gs'
    blob_name = 'credentials.json'
    credentials_temp_path = '/tmp/google_credentials.json'

    # Crea un cliente de Google Cloud Storage
    storage_client = storage.Client()

    # Obtén el bucket y el blob
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    # Descarga el contenido del archivo
    credentials_content = blob.download_as_text()

    # Guarda el contenido en el archivo local
    with open(credentials_temp_path, 'w') as local_file:
        local_file.write(credentials_content)

    # Retorna la ruta del archivo guardado localmente
    return credentials_temp_path

In [None]:
def generate_id(num_records):
  """Genera una tabla con `num_records` registros aleatorios con valores correlativos."""
  table = []
  epoch_value = int(time.time())  # Obtener el valor de epoch

  for i in range(num_records):
      # Concatenar el epoch con el UUID para agregar más entropía
      combined_data = f"{epoch_value}-{uuid.uuid4()}"

      # Generar un hash (por ejemplo, MD5) del dato combinado para obtener un UUID más corto
      hash_uuid = hashlib.md5(combined_data.encode()).hexdigest()

      table.append({
          "donation_id": hash_uuid[:20]  # Tomar los primeros 20 caracteres (ajusta según sea necesario)
      })

  df = pd.DataFrame(table)
  return df

In [None]:
def get_gcs_bucket(bucket_name, credentials_path):
    # Inicializar el cliente de almacenamiento con credenciales desde el archivo
    credentials = service_account.Credentials.from_service_account_file(
        credentials_path,
        scopes=["https://www.googleapis.com/auth/cloud-platform"],
    )

    # Crear el cliente de almacenamiento
    client = storage.Client(credentials=credentials)

    # Obtener el bucket
    bucket = client.get_bucket(bucket_name)

    return bucket

In [None]:
def files_list_from_folder(bucket_name, carpeta):
    # Inicializar el cliente de almacenamiento
    client = storage.Client()

    # Obtener el bucket
    bucket = client.get_bucket(bucket_name)

    # Listar archivos en la subcarpeta especificada
    blobs = bucket.list_blobs(prefix=carpeta)

    # Filtrar archivos excluyendo la carpeta 'PAT/'
    archivos = [blob.name for blob in blobs if blob.name != carpeta]

    # Imprimir nombres de archivos en la subcarpeta (opcional)
    # for archivo in archivos:
    #     print(archivo)

    return archivos

In [None]:
def get_row_count(project_id, dataset_id, table_id):
  try:
      # Create a BigQuery client
      client = bigquery.Client(project=project_id)

      # Construir la consulta COUNT(*)
      count_sql_script = f"SELECT COUNT(*) as count_result FROM `{project_id}.{dataset_id}.{table_id}`"

      # Ejecutar la consulta
      query_job = client.query(count_sql_script)

      # Esperar a que la consulta se complete
      result = query_job.result()

      # Acceder al valor de COUNT(*)
      count_result = next(result).get('count_result')

      return count_result

  except Exception as e:
      print(f"Error executing COUNT(*) query: {str(e)}")
      return None

In [None]:
def load_df_to_bq(dataframe, project_id, dataset_id, table_id):

  client = bigquery.Client(project=project_id)

  # Define the destination table
  table_ref = client.dataset(dataset_id).table(table_id)
  #dataframe[['external_id','created_on']] = dataframe[['external_id','created_on']].astype(str)
  #if format is not None and format in dataframe.columns:
  #        dataframe[format] = dataframe[format].astype(str)

  # Create the job configuration
  job_config = bigquery.LoadJobConfig()
  job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE  # Replace the table

  # Load the DataFrame into BigQuery
  client.load_table_from_dataframe(dataframe, table_ref, job_config=job_config).result()

In [None]:
def bq_to_df(query):
  try:
      # Crear un cliente de BigQuery
      client = bigquery.Client(project=project_id)

      # Ejecutar la consulta SQL para realizar el upsert
      query_job = client.query(f"{query}")

      # Esperar a que se complete la consulta y almacenar los resultados en un DataFrame
      result_dataframe = query_job.to_dataframe()

      print("Operación completada en BigQuery")

      return result_dataframe

  except Exception as e:
      print(f"Error: {e}")
      return None

In [None]:
def execute_sql_script(project_id, upsert_sql_script):
  # Create a BigQuery client
  client = bigquery.Client(project=project_id)

  # Execute the SQL script to perform the upsert
  query_job = client.query(f"{upsert_sql_script}")

  query_job.result()

  # Wait for the query to complete

  print("Operation completed in BigQuery")

### **Seteo de variables de entorno**

In [None]:
spark = SparkSession.builder.appName("ProcesamientoGCS").getOrCreate()

bucket_name = "your-bucket-name"
project_id = 'your-project-id'
dataset_id = 'your-dataset-id'

credentials_path = download_and_save_credentials()
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

## **Backup BLOB 1**

In [None]:
blob1_prep = """
CREATE OR REPLACE TABLE `your-project_id.your-dataset_id.your_table-id` AS
WITH PrepStep1 AS (
  SELECT donation_id,
       data,
       CAST(LEFT(SUBSTR(data,4,22),LENGTH(SUBSTR(data,4,22))-1) AS INT64) id,
       CONCAT(SUBSTR(data,26,4),'-',SUBSTR(data,30,2),'-' ,SUBSTR(data,32,2)) transaction_date,
       CONCAT(SUBSTR(data,26,4),'-',SUBSTR(data,30,2),'-' ,SUBSTR(data,32,2)) close_date,
       CAST(SUBSTR(data,34,12) AS INT64) transaction_amount,
       CAST(SUBSTR(data,48,3) AS INT64) transaction_bank_code,
FROM `your-project_id.your-dataset_id.your_table-id`
),PrepStep2 AS (
 SELECT donation_id,
        data,
        id,
        transaction_amount,
        transaction_date,
        close_date,
        transaction_bank_code,
        IF(data NOT LIKE '%APROBADO%',3,1) transaction_status,
        REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(data,SUBSTR(data,4,22),''),SUBSTR(data,26,8),''),SUBSTR(data,48,3),''),SUBSTR(data,34,12),'') norm
FROM PrepStep1
)SELECT donation_id,
        id,
        transaction_amount,
        NULL account_number,
        2 plan_id,
        CONCAT("XXXX ",CASE WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 1 THEN 'Enero'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 2 THEN 'Febrero'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 3 THEN 'Marzo'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 4 THEN 'Abril'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 5 THEN 'Mayo'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 6 THEN 'Junio'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 7 THEN 'Julio'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 8 THEN 'Agosto'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 9 THEN 'Septiembre'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 10 THEN 'Octubre'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 11 THEN 'Noviembre'
                                WHEN CAST(SUBSTR(close_date,6,2) AS INT64) = 12 THEN 'Diciembre' END," ",SUBSTR(close_date,1,4)
                  )  transaction_source,
        1 transaction_source_code,
        transaction_date,
        close_date,
        transaction_status,
        CASE WHEN norm LIKE '%R01%' AND data NOT LIKE '%APROBADO%' THEN 'Fondos Insuficientes'
             WHEN norm LIKE '%R02%' AND data NOT LIKE '%APROBADO%' THEN 'Cuenta Cerrada / No Operativa'
             WHEN norm LIKE '%R04%' AND data NOT LIKE '%APROBADO%' THEN 'Cuenta No Existe'
             WHEN norm LIKE '%R07%' AND data NOT LIKE '%APROBADO%' THEN 'Autorizacion Revocada Por El C'
             WHEN norm LIKE '%R08%' AND data NOT LIKE '%APROBADO%' THEN 'Orden De No Cargo'
             WHEN norm LIKE '%R10%' AND data NOT LIKE '%APROBADO%' THEN 'Mandato Inexistente'
             WHEN norm LIKE '%R82%' AND data NOT LIKE '%APROBADO%' THEN 'Monto De La Operación Excede M'
             WHEN norm LIKE '%P01%' AND data NOT LIKE '%APROBADO%' THEN 'No Existe Identificador Del Cl'
             WHEN norm LIKE '%R93%' AND data NOT LIKE '%APROBADO%' THEN 'Eliminado Por El Cliente' ELSE NULL END transaction_details,
        CAST(NULL AS INT64) transaction_bank_code,
        CAST(NULL AS STRING) transaction_payment_form,
        true individual_donor
FROM PrepStep2

"""

In [None]:
def blob1(bucket_name, credentials_path, file_list, folder_prefix):
    # Inicializar SparkSession
    spark = SparkSession.builder.appName("ProcesamientoGCS").getOrCreate()

    # Obtener el bucket
    bucket = get_gcs_bucket(bucket_name, credentials_path)

    # Fijar el prefijo del directorio
    folder_prefix = folder_prefix

    # Obtener iterador de blobs en el directorio 'folder_prefix'
    blobs = bucket.list_blobs(prefix=folder_prefix)

    # Filtrar solo los blobs correspondientes a archivos, no a subdirectorios
    archivos_en_blob1 = [blob for blob in blobs if '/' not in blob.name[len(folder_prefix):] and blob.name.startswith(folder_prefix + 'your-file-pattern')]

    # Verificar si hay archivos en el storage que no estén en la lista
    archivos_a_procesar = [blob for blob in archivos_en_blob1 if blob.name[len(folder_prefix):] not in file_list]

    # Si no hay archivos para procesar, devuelve None
    if not archivos_a_procesar:
        return None

    # Procesar cada archivo en paralelo
    dataframes = []

    for blob in archivos_a_procesar:
        nombre_archivo = blob.name[len(folder_prefix):]

        # Imprimir el nombre del archivo que se está procesando
        print(f"Procesando archivo: {nombre_archivo}")

        # Procesar el archivo y obtener el DataFrame
        df = procesar_archivo_google_storage(spark, bucket_name, blob.name, credentials_path)

        # Agregar una columna con el nombre del archivo
        df = df.withColumn("filename", lit(nombre_archivo))

        dataframes.append(df)

    # Unir los DataFrames en uno solo
    combined_df = reduce(lambda df1, df2: df1.union(df2), dataframes)

    return combined_df

In [None]:
def blob1_storage(spark, bucket_name, nombre_archivo, credentials_path):
    # Obtener el bucket
    bucket = get_gcs_bucket(bucket_name, credentials_path)

    blob = bucket.blob(nombre_archivo)

    # Descargar el contenido del archivo como bytes
    contenido_bytes = blob.download_as_bytes()
    contenido_str = contenido_bytes.decode('utf-8')

    # Crear un RDD a partir de las líneas del archivo
    rdd = spark.sparkContext.parallelize(contenido_str.splitlines())

    # Crear un DataFrame a partir del RDD
    schema = StructType([StructField("data", StringType(), True)])
    df = spark.createDataFrame(rdd.map(lambda x: (x,)), schema)

    return df

In [None]:
file_list = files_list_from_folder(bucket_name, folder_prefix)

In [None]:
combined_df_pyspark = blob_pac(bucket_name, credentials_path, file_list, prefix)

In [None]:
df = combined_df_pyspark.toPandas()

In [None]:
donation_id = pd.concat([generate_id(len(df)),df], axis = 1)
load_df_to_bq(donation_id, project_id, dataset_id, 'your-table-id')
execute_sql_script(project_id, blob1_prep )

Operation completed in BigQuery


## **Backup Blob 2**

In [None]:
def blob2(bucket_name, credentials_path, file_list, folder_prefix):
    # Inicializar SparkSession
    spark = SparkSession.builder.appName("ProcesamientoGCS").getOrCreate()

    # Obtener el bucket
    bucket = get_gcs_bucket(bucket_name, credentials_path)

    # Fijar el prefijo del directorio
    folder_prefix = folder_prefix

    # Obtener iterador de blobs en el directorio 'folder_prefix'
    blobs = bucket.list_blobs(prefix=folder_prefix)

    # Filtrar solo los blobs correspondientes a archivos, no a subdirectorios
    archivos_en_blob2 = [blob for blob in blobs if '/' not in blob.name[len(folder_prefix):] and blob.name.startswith(folder_prefix + 'your-file-pattern')]

    # Verificar si hay archivos en el storage que no estén en la lista
    archivos_a_procesar = [blob for blob in archivos_en_blob2 if blob.name[len(folder_prefix):] not in file_list]

    # Si no hay archivos para procesar, devuelve None
    if not archivos_a_procesar:
        return None

    # Procesar cada archivo en paralelo
    combined_df = None

    for blob in archivos_a_procesar:
        nombre_archivo = blob.name[len(folder_prefix):]

        # Imprimir el nombre del archivo que se está procesando
        print(f"Procesando archivo: {nombre_archivo}")

        # Procesar el archivo y obtener el DataFrame
        df = storage_bf(spark, bucket_name, blob.name, credentials_path)

        # Agregar una columna con el nombre del archivo
        df = df.withColumn("filename", lit(nombre_archivo))

        # Unir el DataFrame actual con el DataFrame acumulado
        if combined_df is None:
            combined_df = df
        else:
            combined_df = combined_df.union(df)

    return combined_df

In [None]:
prep_blob2 = """
CREATE OR REPLACE TABLE `your-project_id.your-dataset_id.your_table-id` AS
WITH PrepStep1 AS (
  SELECT donation_id,
         filename,
         data,
         CASE WHEN SAFE_CAST(SUBSTR(CAST(TRIM(data) AS STRING), 1, 3) AS INT64) = 100 THEN SUBSTR(CAST(TRIM(data) AS STRING), 4, 8)
              ELSE SUBSTR(CAST(TRIM(data) AS STRING), 3, 9) END id,
         CASE WHEN SUBSTR(data, -3) LIKE "%R%" THEN RIGHT(data,3) ELSE NULL END transactional_details,
         REGEXP_EXTRACT(data, r'(.{11})Ban') account_number,
         REGEXP_REPLACE(SUBSTR(data, 84,12),r'^0*', '') amount,
         CASE WHEN CAST(SUBSTR(data, 1, 3) AS INT64) > 100 THEN TRIM(SUBSTR(data, 12, STRPOS(SUBSTR(data, 12), '0') - 1))
              ELSE TRIM(SUBSTR(data, 12, STRPOS(SUBSTR(data, 12), '0') - 1)) END AS names,
         DATE(CONCAT(SUBSTR(filename,12,4),'-',SUBSTR(filename,10,2),'-',SUBSTR(filename,8,2))) donation_date
FROM `your-project_id.your-dataset_id.your_table-id`
),PrepStep2 AS (
  SELECT donation_id,
        filename,
        data,
        id,
        account_number,
        names,
        amount,
        transactional_details,
        donation_date
 FROM PrepStep1
),PrepStep3 AS (
SELECT donation_id,
        SUBSTR(filename,1,15) filename,
        id,
        account_number,
        names,
        transactional_details,
        amount,
        donation_date,
        SPLIT(filename, '_')[OFFSET(3)] logs,
        CASE WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 1 THEN 'Enero'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 2 THEN 'Febrero'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 3 THEN 'Marzo'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 4 THEN 'Abril'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 5 THEN 'Mayo'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 6 THEN 'Junio'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 7 THEN 'Julio'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 8 THEN 'Agosto'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 9 THEN 'Septiembre'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 10 THEN 'Octubre'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 11 THEN 'Noviembre'
             WHEN EXTRACT(MONTH FROM DATE(donation_date)) = 12 THEN 'Diciembre' END month
FROM PrepStep2
)SELECT donation_id,
        filename,
        id,
        account_number,
        names,
        transactional_details,
        amount,
        donation_date,
        CONCAT('XXXX',month,' ',EXTRACT( YEAR FROM DATE(donation_date))) transaction_source,
        3 transaction_source_code,
        logs
  FROM PrepStep3
;
"""


In [None]:
def storage_blob2(spark, bucket_name, nombre_archivo, credentials_path):
    # Inicializar el cliente de almacenamiento con credenciales desde el archivo
    credentials = service_account.Credentials.from_service_account_file(
        credentials_path,
        scopes=["https://www.googleapis.com/auth/cloud-platform"],
    )
    client = storage.Client(credentials=credentials)

    # Obtener el bucket
    bucket = client.get_bucket(bucket_name)

    blob = bucket.blob(nombre_archivo)

    # Descargar el contenido del archivo como bytes
    contenido_bytes = blob.download_as_bytes()

    # Decodificar el contenido utilizando la codificación detectada (puedes ajustar la codificación según tus necesidades)
    encoding = "utf-8"  # Ajusta la codificación según tu necesidad
    contenido_str = contenido_bytes.decode(encoding)

    # Dividir el contenido en líneas
    lineas = contenido_str.splitlines()

    # Eliminar la última línea si la lista de líneas no está vacía
    if lineas:
        lineas.pop()

    # Construir el DataFrame
    df = spark.createDataFrame(lineas, "string").toDF("data")

    # Agregar una columna con el nombre del archivo
    df = df.withColumn("filename", lit(nombre_archivo))


    return df

In [None]:
file_list_blob2 = files_list_from_folder(bucket_name, 'folder_prefix/')

In [None]:
df2_pyspark = blob2(bucket_name, credentials_path, file_list_bf, 'folder_prefix/')

In [None]:
df2 = bf_df_pyspark.toPandas()

In [None]:
DonationId_generate = generate_id(len(df2))
df2_prep = pd.concat([DonationId_generate, df2], axis=1)

In [None]:
load_df_to_bq(df2_prep, project_id, dataset_id, 'your-table-id')
execute_sql_script(project_id, prep_blob2)

Operation completed in BigQuery
