In [1]:
import os

from datetime import datetime, date
from pyspark.sql import SparkSession, Row

import pandas as pd

SPARK_MASTER = os.getenv("SPARK_MASTER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")

In [3]:
# Add postgres jar
spark = (
    SparkSession.builder.appName("postgres-app")
    .master(SPARK_MASTER)
    .config("spark.jars", "/opt/spark/jars/postgresql-42.7.3.jar")
    .config("spark.executor.extraClassPath", "/opt/spark/jars/postgresql-42.7.3.jar")
    .config("spark.driver.extraClassPath", "/opt/spark/jars/postgresql-42.7.3.jar")
    .getOrCreate()
)

24/04/17 15:41:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
df = (
    spark.read.format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", "jdbc:postgresql://docker-postgres:5432/postgres")
    .option("dbtable", "information_schema.tables")
    .option("user", "postgres")
    .option("password", POSTGRES_PASSWORD)
    .load()
)

df.printSchema()

root
 |-- table_catalog: string (nullable = true)
 |-- table_schema: string (nullable = true)
 |-- table_name: string (nullable = true)
 |-- table_type: string (nullable = true)
 |-- self_referencing_column_name: string (nullable = true)
 |-- reference_generation: string (nullable = true)
 |-- user_defined_type_catalog: string (nullable = true)
 |-- user_defined_type_schema: string (nullable = true)
 |-- user_defined_type_name: string (nullable = true)
 |-- is_insertable_into: string (nullable = true)
 |-- is_typed: string (nullable = true)
 |-- commit_action: string (nullable = true)



In [5]:
df.show()

                                                                                

+-------------+------------+--------------------+----------+----------------------------+--------------------+-------------------------+------------------------+----------------------+------------------+--------+-------------+
|table_catalog|table_schema|          table_name|table_type|self_referencing_column_name|reference_generation|user_defined_type_catalog|user_defined_type_schema|user_defined_type_name|is_insertable_into|is_typed|commit_action|
+-------------+------------+--------------------+----------+----------------------------+--------------------+-------------------------+------------------------+----------------------+------------------+--------+-------------+
|     postgres|      public|                test|BASE TABLE|                        NULL|                NULL|                     NULL|                    NULL|                  NULL|               YES|      NO|         NULL|
|     postgres|  pg_catalog|        pg_statistic|BASE TABLE|                        NULL|   

In [6]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [7]:
spark.sql("SHOW TABLES").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [8]:
pandas_df = pd.DataFrame(
    {
        "a": [1, 2, 3],
        "b": [2.0, 3.0, 4.0],
        "c": ["string1", "string2", "string3"],
        "d": [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
        "e": [
            datetime(2000, 1, 1, 12, 0),
            datetime(2000, 1, 2, 12, 0),
            datetime(2000, 1, 3, 12, 0),
        ],
    }
)
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [None]:
df.show()

In [None]:
df.collect()

In [8]:
df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="string1", d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
        Row(a=2, b=3.0, c="string2", d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
        Row(a=4, b=5.0, c="string3", d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)),
    ]
)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

# Thrift

In [2]:
# Connect to thrift
spark_thrift = (
    SparkSession.builder.appName("thrift_app")
    .master(SPARK_MASTER)
    .config("spark.hadoop.hive.metastore.uris", "spark://spark-master:10000")
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.sql.legacy.createHiveTableByDefault", False)
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/17 18:40:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark_thrift.conf.get("spark.sql.catalogImplementation")

'hive'

In [4]:
spark_thrift.conf.get("spark.sql.legacy.createHiveTableByDefault")

'false'

In [6]:
spark_thrift.sql("create database hive_test;").show()

24/04/17 18:42:28 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 1s. getDatabase
org.apache.thrift.transport.TTransportException: java.net.SocketException: Broken pipe (Write failed)
	at org.apache.thrift.transport.TIOStreamTransport.flush(TIOStreamTransport.java:161)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:73)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:62)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.send_get_database(ThriftHiveMetastore.java:776)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_database(ThriftHiveMetastore.java:768)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1288)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingM

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.thrift.transport.TTransportException

In [None]:
spark_thrift.sql(
    "create table hive_example(a string, b int) partitioned by(c int);"
).show()