In [53]:
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import Window as W
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

import pandas as pd

In [51]:
import pg8000

In [52]:
jdbc_url = "jdbc:postgresql://postgres:5432/yourdbname"
db_config = {
    'host': 'postgres',
    'port': 5432,
    'user': 'yourusername',
    'password': 'yourpassword',
    'database': 'yourdbname'
}

In [59]:
def save_spark_df_to_postgres(df, table_name='data_table', db_config=db_config):
  pandas_df = df.toPandas()

  conn = pg8000.connect(**db_config)
  cursor = conn.cursor()

  columns = pandas_df.columns.tolist()
  types = pandas_df.dtypes.tolist()

  # Map Pandas dtypes to PostgreSQL data types
  type_mapping = {
      'object': 'TEXT',
      'int64': 'INTEGER',
      'float64': 'FLOAT',
      'datetime64[ns]': 'TIMESTAMP',
      'bool': 'BOOLEAN',
  }

  # Build the CREATE TABLE statement
  create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ("
  for col, dtype in zip(columns, types):
      sql_type = type_mapping.get(str(dtype), 'TEXT')
      create_table_sql += f"{col} {sql_type}, "
  create_table_sql = create_table_sql.rstrip(', ') + ');'

  cursor.execute(create_table_sql)

  # Prepare the INSERT statement
  placeholders = ', '.join(['%s'] * len(columns))
  insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders});"
  data_tuples = list(pandas_df.itertuples(index=False, name=None))

  cursor.executemany(insert_sql, data_tuples)

  conn.commit()
  cursor.close()
  conn.close()

def read_postgres_to_spark_df(table_name='data_table', db_config=db_config):
    conn = pg8000.connect(**db_config)
    cursor = conn.cursor()

    cursor.execute(f"SELECT * FROM {table_name};")
    data = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]

    pandas_df = pd.DataFrame(data, columns=column_names)

    spark_df = spark.createDataFrame(pandas_df)

    cursor.close()
    conn.close()

    spark_df.show()

In [None]:
def drop_deprecated_from_postgres(df, table_name='data_table', db_config=db_config):
  

In [62]:
data = [
    ("key1", "1", "01", "2023-09-25 10:00:00"),
    ("key2", "1", "01", "2023-09-25 11:00:00"),
    ("key3", "1", "01", "2023-09-25 12:00:00"),
    ("key4", "1", "01", "2023-09-25 12:00:00")
]

# Define schema
schema = T.StructType([
    T.StructField("key", T.StringType(), True),
    T.StructField("value", T.StringType(), True),
    T.StructField("year", T.StringType(), True),
    T.StructField("time", T.StringType(), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)

In [63]:
save_spark_df_to_postgres(df)

In [64]:
read_postgres_to_spark_df()

+----+-----+----+-------------------+
| key|value|year|               time|
+----+-----+----+-------------------+
|key1|    1|  01|2023-09-25 10:00:00|
|key2|    1|  01|2023-09-25 11:00:00|
|key3|    1|  01|2023-09-25 12:00:00|
|key4|    1|  01|2023-09-25 12:00:00|
+----+-----+----+-------------------+



In [None]:
data = [
    ("key1", "2", "01", "2023-09-25 10:00:00"),
    ("key2", "2", "01", "2023-09-25 11:00:00"),
    ("key3", "2", "01", "2023-09-25 12:00:00"),
]

# Define schema
schema = T.StructType([
    T.StructField("key", T.StringType(), True),
    T.StructField("value", T.StringType(), True),
    T.StructField("year", T.StringType(), True),
    T.StructField("time", T.StringType(), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)

save_spark_df_to_postgres(df)
drop_deprecated_from_postgres(df)
read_postgres_to_spark_df()

In [None]:
data = [
    ("key1", "3", "02", "2023-09-25 10:00:00"),
    ("key2", "3", "02", "2023-09-25 11:00:00"),
]

# Define schema
schema = T.StructType([
    T.StructField("key", T.StringType(), True),
    T.StructField("value", T.StringType(), True),
    T.StructField("year", T.StringType(), True),
    T.StructField("time", T.StringType(), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)