Autor: Adolfo Eliazat

Assunto: Ingestão dos arquivos csvs com dados de transações

Atualizações:

In [14]:
import requests, json 
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from datetime import datetime, date, timedelta
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType

StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 16, Finished, Available)

In [2]:
spark = SparkSession.builder \
    .master('local') \
    .appName('NOTEBOOK_SPOTPASS_LOAD_API_EVENTS_FULL') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("park.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") \
    .config("spark.microsoft.delta.optimizeWrite.enabled", "true") \
    .config("spark.sql.parquet.vorder.enabled", "true") \
    .getOrCreate()

StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 4, Finished, Available)

In [3]:
write_setup = {
    "compression": "gzip",  # Use "gzip" em vez de "snappy" para a compressão
    "maxRecordsPerFile": 500000,  # Limita o número de registros por arquivo
    "spark.sql.files.maxPartitionBytes": "128m",  # Define o tamanho máximo de cada partição
    "spark.sql.parquet.output.committer.class": "org.apache.spark.sql.parquet.DirectParquetOutputCommitter"  # Usa um commiter mais consistente
}

StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 5, Finished, Available)

**Configurações de data e hora**

In [4]:
date_time = ( datetime.now() - timedelta(days=1))
date = date_time.strftime('%Y-%m-%d')

StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 6, Finished, Available)

In [5]:
delta_table_name = "Tables/spotpass_operations"

StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 7, Finished, Available)

In [8]:
# Define the schema for the JSON data
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("operation_type", StringType(), True),
    StructField("operation_date", StringType(), True),
    StructField("total", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("updated_at", StringType(), True),
    StructField("pos_information", StructType([
        StructField("event", StructType([
            StructField("id", StringType(), True),
            StructField("event_type", StringType(), True),
            StructField("name", StringType(), True),
            StructField("start_date", StringType(), True),
            StructField("end_date", StringType(), True)
        ]), True)
    ]), True),
    StructField("operation_items", ArrayType(StructType([
        StructField("quantity", StringType(), True),
        StructField("unit_price", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("category_name", StringType(), True),
        StructField("created_at", StringType(), True),
        StructField("updated_at", StringType(), True)
    ])), True),
    StructField("payments", ArrayType(StructType([
        StructField("total", StringType(), True),
        StructField("installments", IntegerType(), True),
        StructField("payment_type_name", StringType(), True),
        StructField("acquirer", StringType(), True),
        StructField("primary_code", StringType(), True),
        StructField("status", StringType(), True),
        StructField("date", StringType(), True)
    ])), True),
    StructField("invoice", StringType(), True)
])

StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 10, Finished, Available)

In [11]:
# Function to fetch data for a given page
def fetch_data(url):
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return json.loads(response.text)
    else:
        print(f"Error fetching data from {url}")
        return None

# Function to extract events from response data
def extract_operations(response_data):
    if response_data is None:
        return []
    return response_data.get("content", [])

StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 13, Finished, Available)

In [22]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("API Pagination") \
    .getOrCreate()

# Define the base URL
base_url = "https://public.api.spotpass.com.br/operations"

# Define the headers
headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "AUTHORIZATION": "QM6Gv1kKYzHzhkvPzjbYmzGz"
}

# List to store all operations
all_operations = []

# Variable to track pagination
page = 1

# Loop to fetch all pages
while True:
    url = f"{base_url}?page={page}"
    print(f"Fetching data from {url}")

    # Fetch data for the current page
    response_data = fetch_data(url)
    
    # Extract operations from the response
    operations = extract_operations(response_data)
    
    # If no operations are returned, break the loop
    if not operations:
        break
    
    # Extend the list of all operations with the operations from the current page
    all_operations.extend(operations)
    
    # Increment page for the next iteration
    page += 1

# Create DataFrame from all operations
df = spark.createDataFrame(all_operations, schema)

df = df.withColumn("event_id", df["pos_information.event.id"]) \
       .withColumn("event_type", df["pos_information.event.event_type"]) \
       .withColumn("event_name", df["pos_information.event.name"]) \
       .withColumn("event_start_date", df["pos_information.event.start_date"]) \
       .withColumn("event_end_date", df["pos_information.event.end_date"]) \
       .drop("pos_information")

# Explode the operation_items column
df = df.withColumn("exploded_items", explode(df["operation_items"])) \
       .withColumn("quantity", df["exploded_items.quantity"]) \
       .withColumn("unit_price", df["exploded_items.unit_price"]) \
       .withColumn("product_name", df["exploded_items.product_name"]) \
       .withColumn("category_name", df["exploded_items.category_name"]) \
       .withColumn("item_created_at", df["exploded_items.created_at"]) \
       .withColumn("item_updated_at", df["exploded_items.updated_at"]) \
       .drop("operation_items") \
       .drop("exploded_items")
       
# Show the DataFrame
#df_operations_events_exploded.show(truncate=False)


StatementMeta(, bfd6a0b8-25b2-42a6-ac45-93c18669abd6, 24, Submitted, Running)

Fetching data from https://public.api.spotpass.com.br/operations?page=1
Fetching data from https://public.api.spotpass.com.br/operations?page=2
Fetching data from https://public.api.spotpass.com.br/operations?page=3
Fetching data from https://public.api.spotpass.com.br/operations?page=4
Fetching data from https://public.api.spotpass.com.br/operations?page=5
Fetching data from https://public.api.spotpass.com.br/operations?page=6
Fetching data from https://public.api.spotpass.com.br/operations?page=7
Fetching data from https://public.api.spotpass.com.br/operations?page=8
Fetching data from https://public.api.spotpass.com.br/operations?page=9
Fetching data from https://public.api.spotpass.com.br/operations?page=10
Fetching data from https://public.api.spotpass.com.br/operations?page=11
Fetching data from https://public.api.spotpass.com.br/operations?page=12
Fetching data from https://public.api.spotpass.com.br/operations?page=13
Fetching data from https://public.api.spotpass.com.br/operat

In [25]:
display(df)

StatementMeta(, 3a198fef-d98b-41ea-bdeb-d9274de83553, 26, Finished, Available)

SynapseWidget(Synapse.DataFrame, f42da860-c031-4814-854a-e685c0cf54e8)

In [26]:
# Assuming you already have the DataFrame loaded as 'df'
row_count = df.count()

# Print the row count
print("Accounts count: {}".format(row_count))

StatementMeta(, 3a198fef-d98b-41ea-bdeb-d9274de83553, 27, Finished, Available)

Accounts count: 150


In [27]:
if not df.isEmpty():
    df.write.format("delta")\
    .options(**write_setup)\
    .option("mergeSchema", "true")\
    .option("parquet.vorder.enabled", "force_true")\
    .mode("overwrite")\
    .save(delta_table_name)

StatementMeta(, 3a198fef-d98b-41ea-bdeb-d9274de83553, 28, Finished, Available)

In [31]:
%%sql
SELECT count(*) 
FROM LAKEHOUSE_ELDOURADO.spotpass_operations

StatementMeta(, 3a198fef-d98b-41ea-bdeb-d9274de83553, 32, Finished, Available)

InterpreterError: Fail to start interpreter.
detail: requirement failed: SparkContext has been stopped
error message: 


In [29]:
%%sql
SELECT * 
FROM LAKEHOUSE_ELDOURADO.'{delta_table_name}'

StatementMeta(, 3a198fef-d98b-41ea-bdeb-d9274de83553, 30, Finished, Available)

InterpreterError: Fail to start interpreter.
detail: requirement failed: SparkContext has been stopped
error message: 
