In [2]:
import os

# Definir variáveis de ambiente antes de qualquer importação do Spark
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'
import psycopg2.sql
from dotenv import load_dotenv
import findspark
findspark.init()
from pyspark.sql import SparkSession, Row
import json


In [3]:
load_dotenv()

# Cria conexão com o banco
try:
    conn = psycopg2.connect(os.getenv('DSN'))
    cur = conn.cursor()
except Exception as e:
    print("Erro na conexão:", e)

In [8]:
# Nome da tabela 
table_name = "tabela_eventos"

# Cria a tabela principal e as colunas se ainda não existir
cur.execute(psycopg2.sql.SQL("""CREATE TABLE IF NOT EXISTS 
                                    {} (id SERIAL PRIMARY KEY, data_row TEXT)
                            """).format(psycopg2.sql.Identifier(table_name))
            )
conn.commit()

# Valida criação da tabela e schema
cur.execute("""SELECT 
                    column_name, 
                    data_type
                FROM 
                    information_schema.columns
                WHERE 
                    table_name = 'tabela_eventos';
            """)
type = cur.fetchall()
print(type)



[('id', 'integer'), ('data_row', 'text')]


In [9]:
# Verifica e cria sessão Spark
try:
        if SparkSession.getActiveSession() is not None:
                spark = SparkSession.getActiveSession()
                print(f"Sessão Spark já ativa: {spark.sparkContext.appName}")
        else:
                spark = SparkSession.builder \
                .appName("Inserir_JSON_no_PostgreSQL") \
                .getOrCreate()
except Exception as e:
    print("Erro ao iniciar SparkSession:", str(e))

In [10]:

# import json
# from pyspark.sql import Row

# Lê o JSON bruto
with open("/home/jovyan/data/eventos_usuarios.json", "r") as f:
    raw_json = json.load(f)

# Cria RDD a partir do JSON lido
rdd = spark.sparkContext.parallelize(raw_json)

# Converte RDD para DataFrame
df = spark.read.json(rdd)

# Converte linha para string JSON
from pyspark.sql.functions import to_json, struct

# Cria string JSON em uma única coluna
df_json = df.selectExpr("to_json(struct(*)) AS data_row")
df_json.show(truncate=False)


# Escreve as linhas JSON na tabela do PostgreSQL
df_json.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/postgres") \
    .option("dbtable", f"{table_name}") \
    .option("user", os.getenv('POSTGRES_USER')) \
    .option("password", os.getenv('POSTGRES_PASSWORD')) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()



+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|data_row                                                                                                                                                                                                                                                                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
# Verifica dados na tabela
cur.execute(psycopg2.sql.SQL("""SELECT 
                                  * 
                                FROM {}""")
            .format(psycopg2.sql.Identifier(table_name))
            )
row = cur.fetchone()
print(row)

(1, '{"device":{"os":"Android","type":"mobile"},"event_time":"2025-06-01T09:30:00Z","event_type":"product_view","location":{"city":"Curitiba","country":"Brazil"},"products":[{"category":"Electronics","id":1,"name":"Smartphone A","price":1200.0}],"session_id":"sess-007","user_id":128}')


In [12]:
# Seleciona categorias
cur.execute(psycopg2.sql.SQL(""" SELECT 
                                     p->>'category' AS product_name
                                 FROM 
                                    {}, 
                                 json_array_elements(data_row::json->'products') AS p
                            """).format(psycopg2.sql.Identifier(table_name))
            )
row = cur.fetchall()
print(row)

[('Electronics',), ('Computers',), ('Computers',), ('Accessories',), ('Computers',), ('Electronics',), ('Gaming',), ('Storage',), ('Photography',), ('Storage',), ('Accessories',), ('Gaming',), ('Storage',), ('Electronics',), ('Accessories',), ('Storage',), ('Drones',), ('Accessories',), ('Accessories',), ('Storage',), ('Storage',), ('Accessories',), ('Accessories',), ('Gaming',), ('Accessories',), ('Accessories',), ('Storage',), ('Drones',), ('Storage',), ('Photography',), ('Wearables',), ('Electronics',), ('Accessories',), ('Accessories',), ('Electronics',), ('Computers',), ('Wearables',), ('Wearables',), ('Accessories',), ('Electronics',), ('Photography',), ('Accessories',), ('Gaming',), ('Accessories',), ('Accessories',), ('Gaming',)]


In [5]:
# cur = conn.cursor()

# # Limpa todos os dados da tabela
# cur.execute(psycopg2.sql.SQL("TRUNCATE TABLE {};").format(psycopg2.sql.Identifier(table_name)))
# conn.commit()


# cur.execute(psycopg2.sql.SQL("DROP TABLE IF EXISTS {};;").format(psycopg2.sql.Identifier(table_name)))

# conn.commit()

In [7]:
# conn.rollback()  # Limpa o estado de erro anterior


In [13]:
# Conta o número de acessos por categoria
cur.execute(psycopg2.sql.SQL(""" SELECT 
                                     p->>'category' AS category, 
                                     COUNT(p->>'category') AS count
                                FROM 
                                    {}, 
                                json_array_elements(data_row::json->'products') AS p
                                GROUP BY 
                                    p->>'category' 
                                ORDER BY 
                                    count DESC
                            """).format(psycopg2.sql.Identifier(table_name))
            )
row = cur.fetchall()
print(row)

[('Accessories', 15), ('Storage', 8), ('Electronics', 6), ('Gaming', 5), ('Computers', 4), ('Photography', 3), ('Wearables', 3), ('Drones', 2)]


In [None]:
# Número de acessos e preço agrupado por categoria e país
cur.execute(psycopg2.sql.SQL(""" SELECT 
                                     p->>'category' AS category,
                                      data_row::json->'location'->>'country' AS country,
                                      COUNT(*) AS total,
                                       SUM(CAST(p->>'price' as float)) AS total_price
                                 FROM {}, 
                                 json_array_elements(data_row::json->'products') AS p
                                 GROUP BY 
                                     country,
                                     p->>'category' 
                                ORDER BY
                                    total DESC
                            """).format(psycopg2.sql.Identifier(table_name))
            )
row = cur.fetchall()
print(row)

[('Accessories', 'Brazil', 8, 2313.8), ('Storage', 'USA', 4, 917.8), ('Accessories', 'USA', 4, 699.6), ('Electronics', 'Brazil', 4, 6600.0), ('Storage', 'Brazil', 3, 179.7), ('Accessories', 'Mexico', 3, 548.8), ('Gaming', 'USA', 3, 5899.0), ('Computers', 'Argentina', 2, 1798.0), ('Drones', 'Brazil', 2, 9000.0), ('Electronics', 'USA', 2, 3000.0), ('Computers', 'USA', 2, 7000.0), ('Photography', 'Mexico', 2, 5998.0), ('Storage', 'Mexico', 1, 399.0), ('Wearables', 'Brazil', 1, 999.0), ('Photography', 'USA', 1, 2999.0), ('Gaming', 'Brazil', 1, 2800.0), ('Gaming', 'Mexico', 1, 2800.0), ('Wearables', 'USA', 1, 999.0), ('Wearables', 'Mexico', 1, 999.0)]


In [14]:
# JSON atividades do usuário
cur.execute(psycopg2.sql.SQL(""" SELECT 
                                  user_id,
                                  session_id,
                                  json_agg(json_build_object(
                                                'event_type', event_type,
                                                'event_time', event_time,
                                                'country', country,
                                                'product_detail', json_build_object(
                                                                            'product_name', product->'name',
                                                                            'product_price', product->'price',
                                                                            'product_category', product->'category'
                                                                    )
                                            )) AS eventos
                                FROM (
                                        SELECT 
                                            data_row::json->>'session_id' AS session_id,
                                            data_row::json->>'user_id' AS user_id,
                                            data_row::json->>'event_type' AS event_type,
                                            data_row::json->>'event_time' AS event_time,
                                            data_row::json->'location'->>'country' AS country,
                                            json_array_elements(data_row::json->'products') AS product
                                        FROM {}
                                    ) t
                                GROUP BY 
                                    session_id, 
                                    user_id;
                            """).format(psycopg2.sql.Identifier(table_name))
            )
row = cur.fetchone()
print(row)

('115', 'sess-001', [{'event_type': 'cart_view', 'event_time': '2025-06-01T10:50:00Z', 'country': 'Brazil', 'product_detail': {'product_name': 'Mouse Gamer', 'product_price': 249.0, 'product_category': 'Accessories'}}, {'event_type': 'cart_view', 'event_time': '2025-06-01T10:50:00Z', 'country': 'Brazil', 'product_detail': {'product_name': 'Drone Z', 'product_price': 4500.0, 'product_category': 'Drones'}}])
