In [2]:
from pyspark.sql import SparkSession

In [3]:
appName = 'Python Spark SQL basic example'

spark_jars = '/home/sade/.local/share/DBeaverData/drivers/maven/maven-central/org.postgresql/postgresql-42.2.5.jar'
# spark_jars: Postgres' ye bağlanmak için gereken bir şey. Veritabanı işlemleri için DBaver kullanıyorum.
# Bu jar' ı DBaver kendi indiriyor. Spark içinde bu jar gerekliydi ben de DBaver' ın indirdiği jar' ı verdim.

spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.jars", spark_jars) \
    .getOrCreate()

In [5]:
# PostgreSQL' den veri çekme.

format_ = 'jdbc' # Ne olduğuna dair bir bilgim yok.
url = 'jdbc:postgresql://localhost:5432/postgres' # Postgre' ye bağlanmak için gereken host, port ve db adı.
query = '(select * from postgres_table) pt' # Veritabanından çekilecek sorgu. Buraya sadece isimde gelebiliyor.
user = 'postgres' # DB kullanıcı adı
password = '"123"' # DB şifre
driver = 'org.postgresql.Driver' # Ne olduğunu bilmiyorum. Benim baktığım örnekte Oracle' a bağlanıyordu. 
# Ben de o örneğe bakarak bunu BDaver' dan kopyaladım. 


# Sorgu çekiliyor ve gelen cevap bir DataFrame' e dönüştürülüyor.
read_table = spark.read \
    .format(format_) \
    .option("url", url) \
    .option("dbtable", query) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", driver) \
    .load()

In [10]:
print("read_table type: ", type(read_table))

read_table type:  <class 'pyspark.sql.dataframe.DataFrame'>


In [11]:
# read_table' ın içindeki veriler
read_table.show()

+-----------+--------------------+--------+-----------+----------+
|      event|           messageid|  userid|  productid|    source|
+-----------+--------------------+--------+-----------+----------+
|ProductView|435784df-1a3c-4fc...| user-78| product-99|   desktop|
|ProductView|920b451e-b364-42f...|user-228|product-523|   desktop|
|ProductView|a9f9ef41-5f52-4c2...|user-233|product-759|mobile-web|
|ProductView|2af77044-3c4a-429...|user-486|product-604|   desktop|
|ProductView|4e12d021-d66f-4f3...| user-71| product-70|   desktop|
|ProductView|1ce6faca-818d-408...| user-90| product-20|mobile-web|
|ProductView|38ce0527-dc5e-434...|user-234| product-72|mobile-app|
|ProductView|eaccc257-5f7a-43c...|user-220|product-622|mobile-web|
|ProductView|dfaee9bb-42d9-483...|user-451| product-24|   desktop|
|ProductView|62b88fb6-6249-4c7...| user-10|product-155|mobile-app|
|ProductView|981c19e5-2e7c-451...| user-60|product-169|mobile-web|
|ProductView|dff5dbde-593a-4cd...|user-165|product-535|mobile-

In [13]:
# Şimdi sorgu çekme işlemi daha hızlı olsun diye bir tane method yazalım

def spark_query(
    query: str,
    format_: str = 'jdbc',
    url: str = 'jdbc:postgresql://localhost:5432/postgres',
    user: str = 'postgres',
    password: str = '"123"',
    driver: str = 'org.postgresql.Driver'
):
    return spark.read \
        .format(format_) \
        .option("url", url) \
        .option("dbtable", query) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", driver) \
        .load()

# Şimdi deniyelim ve çektiği verileri göstersin
read_table_method = spark_query(query='(select * from postgres_table) pt')

read_table_method.show()

+-----------+--------------------+--------+-----------+----------+
|      event|           messageid|  userid|  productid|    source|
+-----------+--------------------+--------+-----------+----------+
|ProductView|435784df-1a3c-4fc...| user-78| product-99|   desktop|
|ProductView|920b451e-b364-42f...|user-228|product-523|   desktop|
|ProductView|a9f9ef41-5f52-4c2...|user-233|product-759|mobile-web|
|ProductView|2af77044-3c4a-429...|user-486|product-604|   desktop|
|ProductView|4e12d021-d66f-4f3...| user-71| product-70|   desktop|
|ProductView|1ce6faca-818d-408...| user-90| product-20|mobile-web|
|ProductView|38ce0527-dc5e-434...|user-234| product-72|mobile-app|
|ProductView|eaccc257-5f7a-43c...|user-220|product-622|mobile-web|
|ProductView|dfaee9bb-42d9-483...|user-451| product-24|   desktop|
|ProductView|62b88fb6-6249-4c7...| user-10|product-155|mobile-app|
|ProductView|981c19e5-2e7c-451...| user-60|product-169|mobile-web|
|ProductView|dff5dbde-593a-4cd...|user-165|product-535|mobile-

In [18]:
# Sanırım Spark' ın kendine ait bir sorgu dili var. Çünkü bu şekildeki normal veritabanı sorguları çalışmıyor.
spark_query(query='SELECT * FROM postgres_table')
# Veridiği hata: ERROR: syntax error at or near "SELECT"

In [19]:
# Spark ile elde ettiğimiz veriler üzerinde bazı işlemler yapma

import pyspark.sql.functions as f


split_user_id = f.split(read_table['userid'], '-') # userid kolonunu -' ye göre parçalıyoruz.

In [24]:
print(type(split_user_id)) # Ne bilmiyorum.

<class 'pyspark.sql.column.Column'>


In [25]:
split_product_id = f.split(read_table["productid"], "-") # productid kolonunu -' ye göre parçalıyoruz.

In [26]:
# Şimdi buradaki işlem sonunda userid ve productid den yeni bir DF oluşturuyoruz ama arka planda 
# ne dönüyor bilmiyorum.
result_df = read_table.select(
    split_user_id.getItem(1).alias("userid"),
    split_product_id.getItem(1).alias("productid")
)

result_df.show()

+------+---------+
|userid|productid|
+------+---------+
|    78|       99|
|   228|      523|
|   233|      759|
|   486|      604|
|    71|       70|
|    90|       20|
|   234|       72|
|   220|      622|
|   451|       24|
|    10|      155|
|    60|      169|
|   165|      535|
|   392|      754|
|   115|      384|
|     9|       71|
|   117|      207|
|   282|      176|
|   261|       61|
|    47|      584|
|    79|      127|
+------+---------+
only showing top 20 rows



In [27]:
# Şimdi elde ettiğimiz DF' ı PostgreSQL' e yazalım.
# Burası tam olarak ne yapar bilmiyorum.

write_table = 'spark_postgres'

result_df.write.format(format_) \
    .option("url", url) \
    .option("dbtable", write_table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", driver) \
    .mode('append') \ # Acaba veritabanının içini boşaltıp doldurma özelliği var mı? Başka hangi mod' lar var?
    .save()