In [None]:


from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
import psycopg2
import pandas as pd
import requests

# Definições do DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('etl_postgres_to_databricks', default_args=default_args, schedule_interval='@daily')

# Função para extrair dados do PostgreSQL
def extract_data():
    con = psycopg2.connect(host='your-host', database='your-database', user='your-user', password='your-password')
    query = """
    SELECT q.id_quote AS "Quote Id",
           q.nr_quote AS "Quote Number",
           q.id_opportunity AS "Opportunity Id",
           q.id_vendedor AS "Seller Id",
           q.id_cliente AS "Client Id",
           q.id_order AS "Order Id",
           q.id_especificador AS "Specifier Id",
           q.ds_status AS "Quote Status",
           q.dt_lastmodified::date AS "Modification Date",
           q.dt_emissao AS "Issuance Date",
           q.dt_prev_fechamento AS "Expected Closing Date",
           q.fl_orcamento_principal AS "Main Quote",
           q.dt_desejo AS "Desired Date",
           q.dt_expiracao AS "Expiration Date",
           q.cd_motivo_cancelamento AS "Cancellation Reason Code",
           q.ds_motivo_cancelamento AS "Cancellation Reason Description",
           dr.ds_name AS "Sales Type",
           sq.id AS "Quote Item Id",
           sq.nm_ambiente AS "Environment",
           sq.fl_corte_especial AS "Special Cut",
           ds_classificacao AS "Product Classification",
           ds_formato AS "Product Format",
           ds_grandes_formatos AS "Large Format Products",
           ds_linha AS "Product Line",
           nm_produto AS "Product Name",
           ds_tipologia_cml AS "Commercial Typology",
           ds_tipologia AS "Main Typology",
           cd_unimed AS "Unit of Measure",
           ds_fase_vida AS "Life Stage",
           class_port_1 AS "Product Category 1",
           class_port_2 AS "Product Category 2",
           class_port_3 AS "Product Category 3",
           class_port_4 AS "Product Category 4"
    FROM your_schema.stage_quote q
    LEFT JOIN your_schema.stage_quotelineitem sq ON sq.id_quote = q.id_quote
    LEFT JOIN your_schema.dim_product2 p ON p.id = sq.id_product2
    LEFT JOIN your_schema.dim_produto_orc po ON po.cod_produto_ora = p.id_productcode
    LEFT JOIN your_schema.dim_recordtype dr ON dr.id = q.recordtype_id
    """
    df = pd.read_sql(query, con)
    df.to_csv('/tmp/raw_data.csv', index=False)
    con.close()

# Função para transformar dados
def transform_data():
    df = pd.read_csv('/tmp/raw_data.csv')
    df['Year'] = pd.to_datetime(df['Issuance Date'], utc=True).dt.year
    df.to_csv('/tmp/transformed_data.csv', index=False)

# Função para carregar dados no Databricks
def load_data():
    df = pd.read_csv('/tmp/transformed_data.csv')
    # Aqui você pode usar a API do Databricks para carregar os dados no Databricks
    url = "https://your-databricks-instance/api/2.0/dbfs/put"
    headers = {
        "Authorization": f"Bearer {your_access_token}"
    }
    files = {'file': open('/tmp/transformed_data.csv', 'rb')}
    response = requests.post(url, headers=headers, files=files)
    response.raise_for_status()

# Tarefas do DAG
start_task = DummyOperator(task_id='start', dag=dag)

extract_task = PythonOperator(task_id='extract_data', python_callable=extract_data, dag=dag)

transform_task = PythonOperator(task_id='transform_data', python_callable=transform_data, dag=dag)

load_task = PythonOperator(task_id='load_data', python_callable=load_data, dag=dag)

end_task = DummyOperator(task_id='end', dag=dag)

# Definição da sequência das tarefas
start_task >> extract_task >> transform_task >> load_task >> end_task


In [None]:

# Databricks notebook source
from pyspark.sql import SparkSession
import pandas as pd

# Iniciar sessão Spark
spark = SparkSession.builder.appName("SalesDataAnalysis").getOrCreate()

# Carregar dados de vendas
sales_df = spark.read.format("csv").option("header", "true").load("/dbfs/tmp/transformed_data.csv")

# Exibir esquema do DataFrame
sales_df.printSchema()

# Transformar dados
sales_df = sales_df.withColumnRenamed("Quote Id", "Quote ID")
sales_df = sales_df.withColumnRenamed("Quote Status", "Status")
sales_df = sales_df.withColumn("Year", sales_df["Issuance Date"].substr(0, 4))

# Converter para Pandas DataFrame para análise adicional
sales_pdf = sales_df.toPandas()

# Análise de vendas por produto
sales_summary = sales_pdf.groupby("Product Name").agg({
    "Quote ID": "count",
    "Product Classification": "first"
}).reset_index()

# Exibir resumo
print(sales_summary)

# Salvar resumo em formato CSV
sales_summary.to_csv("/dbfs/tmp/sales_summary.csv", index=False)

# Parar sessão Spark
spark.stop()
