In [1]:

import requests
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
import socket
import configparser
import json

config = configparser.ConfigParser()
config.read('reddit_api/reddit-cred.config')


class Prepare:
    def get_data_from_api(self,topic='dataengineering'):
        base_url = 'https://www.reddit.com/'
        auth = requests.auth.HTTPBasicAuth(
            config.get('REDDIT','user_id'),
            config.get('REDDIT','secret')
        )
        
        data = {"grant_type": "password", 
                    "username": config.get('REDDIT','username'),
                    "password": config.get('REDDIT','password')}

        r = requests.post(base_url + 'api/v1/access_token',
                            data=data,
                        headers={'user-agent': 'reddit-data'},
                auth=auth)

        token = 'bearer ' + r.json()['access_token']

        base_url = 'https://oauth.reddit.com'
        headers = {'Authorization': token, 'User-Agent': 'reddit-data'}

        payload = {'q': f'r/{topic}', 'limit': 100, 'sort': 'new'}
        response = requests.get(base_url + '/search', headers=headers, params=payload)
        values = response.json()
        return values

    def format_data(self,values):
        fetched_data = []
        for i in range(len(values['data']['children'])):
            data = dict()
            data['TS'] = values['data']['children'][i]['data']['created']
            data['TS_UTC'] = values['data']['children'][i]['data']['created_utc']
            data['TITLE'] = values['data']['children'][i]['data']['title'].replace('\n',' ').replace('/r','')
            data['TEXT'] = values['data']['children'][i]['data']['selftext'].replace('\n',' ').replace('/r','')
            data['NSFW'] = values['data']['children'][i]['data']['over_18']
            data['VOTE_RATIO'] = float(values['data']['children'][i]['data']['upvote_ratio'])
            data['SCORE'] = float(values['data']['children'][i]['data']['score'])
            data['URL'] = values['data']['children'][i]['data']['url']
            data['USER_NAME'] = values['data']['children'][i]['data']['author']
            data["WLS"] = values['data']['children'][i]['data']['wls']

            data["SUBREDDIT"] = values['data']['children'][i]['data']['subreddit']
            data["SUBREDDIT_TYPE"] = values['data']['children'][i]['data']['subreddit_type']
            data["SUBREDDIT_SUBSCRIBER_COUNT"] = values['data']['children'][i]['data']['subreddit_subscribers']


            
            fetched_data.append(data)
        return fetched_data

    def produce_kafka(self,fetched_data,topic_name):

        conf = {'bootstrap.servers': '192.168.89.83:9092',
                'client.id': socket.gethostname()}

        producer = Producer(conf)

        # check topicname
        if topic_name not in producer.list_topics().topics:
            ac = AdminClient(conf)
            topic_list = []
            topic_list.append(NewTopic(topic=topic_name, num_partitions=1, replication_factor=1))
            ac.create_topics(new_topics=topic_list, validate_only=False)

        # create ack
        def acked(err, msg):
            if err is not None:
                print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
            else:
                print("Message produced: %s" % (str(msg)))

        for item in range(len(fetched_data)):
            producer.produce(topic_name, key="data", value="{}".format(json.dumps(fetched_data[item])), callback=acked)

        producer.flush()

# SPARK 

In [2]:
from pyspark.sql.functions import date_format

In [5]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType,StringType, FloatType
import pyspark
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import date_format


full_path_to_warehouse = 's3a://warehouse'
# Bessie için kullanacağımız branch 
branch_name = "main"
# Nessie authentication türü. Diğer seçenekler (NONE, BEARER, OAUTH2 or AWS)
auth_type = "NONE"
# AWS S3 yerine MinIO kullandığımız için. Spark'a amazona gitme burada kal demek için.
s3_endpoint = "http://192.168.89.83:9000"
# MinIO'ya erişim için. Bunlar root olarak docker-compose içinde belirtiliyor. Bu haliyle canlı ortamlarda kullanılmamalıdır.
accessKeyId='dr6MxGKaZrSAh77gP8T0'
secretAccessKey='czrPTk1rsUMzebG1UlJ0kki5VeGZPUYR5iIFX0af'
nessie_url = "http://192.168.89.83:19120/api/v1"

spark = (
    SparkSession.builder
    .master("local")
    .appName("Spark Unıty Iceberg Demo")
    .config("spark.driver.memory", "16g")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
                 "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\
    .config('spark.jars.packages','org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2,org.apache.hadoop:hadoop-aws:3.4.0')
    .config("spark.hadoop.fs.s3a.access.key", accessKeyId)
    .config("spark.hadoop.fs.s3a.secret.key", secretAccessKey)
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # Spark Amazon S3 varsayılan API'sine değil lokaldeki MinIO'ya gitsin.
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint)
    .config("fs.s3a.connection.ssl.enabled", "false")
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')

    #Configuring Catalog
    .config("spark.sql.catalog.nessie.uri", nessie_url)
    .config("spark.sql.catalog.nessie.ref", branch_name)
    .config("spark.sql.catalog.nessie.authentication.type", auth_type)
     .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    .config("spark.sql.catalog.nessie.warehouse", full_path_to_warehouse)
    .config("fs.s3a.connection.ssl.enabled", "false")
    .getOrCreate()
)

In [2]:
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("SCORE", FloatType(), True),
    StructField("SUBREDDIT", StringType(), True),
    StructField("ETL_DATE" ,StringType(), True),
    StructField("TITLE", StringType(), True),
    StructField("TEXT",StringType(), True),
    StructField("SUBREDDIT_TYPE",StringType(), True),
    StructField("SUBREDDIT_SUBSCRIBER_COUNT",IntegerType(), True),
    StructField("URL" ,StringType(), True),
    StructField("TS" ,StringType(), True),
    StructField("VOTE_RATIO",FloatType(), True),
    StructField("TOPIC",StringType(), True),
    StructField("USER_NAME",StringType(), True),
    StructField("WLS",FloatType(), True)
    ])


#df_parquet= spark.read.parquet("s3a://reddit/*.parquet", inferSchema=True, schema=schema)
df= spark.read.parquet("s3a://reddit/*", inferSchema=True, schema=schema)

df.show(2)


24/10/27 00:41:41 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+--------------------+-----+---------------+-------------------+--------------------+--------------------+--------------+--------------------------+--------------------+-------------------+----------+-----+---------------+---+
|            filename|SCORE|      SUBREDDIT|           ETL_DATE|               TITLE|                TEXT|SUBREDDIT_TYPE|SUBREDDIT_SUBSCRIBER_COUNT|                 URL|                 TS|VOTE_RATIO|TOPIC|      USER_NAME|WLS|
+--------------------+-----+---------------+-------------------+--------------------+--------------------+--------------+--------------------------+--------------------+-------------------+----------+-----+---------------+---+
|74090606-60cd-40a...|  4.0|       VietTalk|2024-10-19T16:00:14|Khi những máy cày...|Họ là những khách...|        public|                      4731|https://www.reddi...|2024-10-19T15:41:49|       1.0|  thy|fillapdesehules|6.0|
|62948792-f794-45a...|  1.0|IndianTeenagers|2024-10-19T16:00:14|✨MOST AWAITED SCH...|Hellll-

## Describe Data

In [3]:
most_write_user = df\
    .filter(df.USER_NAME.isNotNull())\
    .groupBy("USER_NAME")\
    .count()\
    .orderBy("count", ascending=False).coalesce(2)
most_write_user.show()

                                                                                

+--------------------+-----+
|           USER_NAME|count|
+--------------------+-----+
|       CabianD_uTest|  985|
|         ar_david_hh|  399|
|     madscientist174|  379|
|       Faction_Chief|  376|
| subredditsummarybot|  375|
|   Far-Elephant-2612|  363|
|TearRepresentative56|  318|
|       AutoModerator|  312|
|           GiversBot|  293|
|       LivinAmiracle|  253|
|ThisIsARealAccountAP|  249|
|    Annabelle-Surely|  242|
|         OolongSippy|  238|
|           bmasumian|  235|
|             aproyal|  230|
|               MRGDN|  216|
|       xMysticChimez|  215|
|          BigBaibars|  206|
|              Fiff02|  206|
|        SkipperDipps|  202|
+--------------------+-----+
only showing top 20 rows



In [6]:
daily_message_df = df.groupBy(date_format("ETL_DATE", "yyyy-MM-dd").alias("date")).count()
daily_message_df.show()



+----------+-----+
|      date|count|
+----------+-----+
|2024-10-18| 7200|
|2024-10-17| 7200|
|2024-10-19| 6993|
|2024-10-16| 7200|
|2024-10-15| 7200|
|2024-10-13| 4550|
|2024-10-14| 7200|
|2024-10-23| 1200|
|2024-10-24| 1166|
|2024-10-22| 1200|
|2024-10-20| 1192|
|2024-10-26| 1102|
|2024-10-21| 1200|
|2024-10-25| 1200|
+----------+-----+



                                                                                

In [7]:
most_subscriber_per_day_df = df.groupBy(date_format("ETL_DATE", "yyyy-MM-dd").alias("date"),
                                df.USER_NAME
                                ).count()
most_subscriber_per_day_df.show()



+----------+--------------------+-----+
|      date|           USER_NAME|count|
+----------+--------------------+-----+
|2024-10-19|      TheMightyBox72|   23|
|2024-10-19|Unique_Homework_4065|   23|
|2024-10-18| HiMaintainceMachine|   14|
|2024-10-19|         TheLight123|   14|
|2024-10-19|        EarthPuma120|    5|
|2024-10-19|     Unable_Pool3236|    7|
|2024-10-19|            catronex|   24|
|2024-10-17|       Faction_Chief|   25|
|2024-10-17|         PeterLoew88|   24|
|2024-10-18|       Faction_Chief|   55|
|2024-10-19|        Neon_Genisis|    7|
|2024-10-19|Regular_Software_452|   21|
|2024-10-17|           Khrizizzo|   24|
|2024-10-17|   Medical_Poet_5490|   24|
|2024-10-19|              -sxmxd|    7|
|2024-10-18|               MRGDN|   48|
|2024-10-19|          steve32767|   24|
|2024-10-19|    GameProfessional|    6|
|2024-10-19|         ar_david_hh|   77|
|2024-10-19|        teaquiladiva|   23|
+----------+--------------------+-----+
only showing top 20 rows



                                                                                

In [8]:
topic_count = df.groupBy("topic").count()
topic_count.show()



+------+-----+
| topic|count|
+------+-----+
|   thy|24310|
|  cars|15819|
|turkey|15674|
+------+-----+



                                                                                

In [9]:
avg_score_per_topic = df.groupBy("topic").avg("SCORE").withColumnRenamed("avg(SCORE)", "AVG_SCORE")
avg_score_per_topic.show()



+------+-----------------+
| topic|        AVG_SCORE|
+------+-----------------+
|   thy|75.28918140682846|
|  cars|19.44433908590935|
|turkey|78.55040193951767|
+------+-----------------+



                                                                                

## Iceberg

In [11]:
from pyspark.sql.types import DateType


In [13]:
CATALOG_NAME = "nessie"
DB_NAME = "reddit"

## Create Table

In [None]:
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME};")

In [12]:
TABLE_NAME = "daily_message"
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        date   DATE,
        count   INTEGER
        )
    USING iceberg
    PARTITIONED BY (date);
    """)

#--------------------------------------------------------------------------#

TABLE_NAME = "most_write_user"
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        USER_NAME   STRING,
        count   INTEGER
        )
    USING iceberg
    PARTITIONED BY (USER_NAME);
    """)

#--------------------------------------------------------------------------#

TABLE_NAME = "topic_count"
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} (
        topic   STRING,
        count   INTEGER
        )
    USING iceberg
    """)

#--------------------------------------------------------------------------#

TABLE_NAME = "most_subscriber_per_day"
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
        date   TIMESTAMP,
        USER_NAME   STRING,
        count   INTEGER
        )
    USING iceberg
    PARTITIONED BY (date);
    """)

#--------------------------------------------------------------------------#

TABLE_NAME = "avg_score_per_topic"
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} (
        topic   STRING,
        AVG_SCORE   DOUBLE
        )
    USING iceberg
    """)

DataFrame[]

In [13]:
# unpartitioned tables
avg_score_per_topic.write.mode("append").insertInto(f"{CATALOG_NAME}.{DB_NAME}.avg_score_per_topic") 

topic_count.write.mode("append").insertInto(f"{CATALOG_NAME}.{DB_NAME}.topic_count") 


                                                                                

In [21]:
#partitioned tables

most_subscriber_per_day_df\
.withColumn("date",most_subscriber_per_day_df['date'].cast(DateType()))\
.orderBy("date")\
.writeTo(f"{CATALOG_NAME}.{DB_NAME}.most_subscriber_per_day_withpartitions").append()


                                                                                

In [22]:
most_write_user.orderBy("USER_NAME")\
    .write.mode("append").insertInto(f"{CATALOG_NAME}.{DB_NAME}.most_write_user_withpartitions") 


                                                                                

In [None]:
daily_message_df.orderBy("date")\
.withColumn("date",daily_message_df['date'].cast(DateType()))\
.orderBy("date")\
    .write.mode("append").insertInto(f"{CATALOG_NAME}.{DB_NAME}.daily_message_withpartitions")



## Branch

In [None]:
# create branch
URL = f"{CATALOG_NAME}.{DB_NAME}.daily_message_withpartitions"
print(URL)
spark.sql(f"ALTER TABLE {URL} CREATE BRANCH development")

In [14]:
daily_message_df\
.withColumn("date",daily_message_df['date'].cast(DateType())).write.format("iceberg")\
    .option("branch", "development")\
    .mode("append")\
    .save(f"{CATALOG_NAME}.{DB_NAME}.daily_message_withpartitions")

                                                                                

In [17]:
spark.sql("SELECT * FROM nessie.reddit.daily_message_withpartitions VERSION AS OF 'development' ORDER BY date DESC ;").show()



+----------+-----+
|      date|count|
+----------+-----+
|2024-10-26| 1102|
|2024-10-25| 1200|
|2024-10-24| 1166|
|2024-10-23| 1200|
|2024-10-22| 1200|
|2024-10-22| 1200|
|2024-10-22| 1200|
|2024-10-21| 1200|
|2024-10-21| 1200|
|2024-10-21| 1200|
|2024-10-20| 1192|
|2024-10-20| 1192|
|2024-10-20| 1124|
|2024-10-20| 1192|
|2024-10-19| 6993|
|2024-10-19| 6993|
|2024-10-19| 6993|
|2024-10-19| 6993|
|2024-10-18| 7200|
|2024-10-18| 7200|
+----------+-----+
only showing top 20 rows



                                                                                

In [None]:
# delete branch
#spark.sql("ALTER TABLE glue.test.employees DROP BRANCH ML_exp")

## TAG

In [15]:
spark.sql(f"ALTER TABLE {CATALOG_NAME}.{DB_NAME}.daily_message_withpartitions CREATE TAG 22_OCT_REPORT")

DataFrame[]

In [48]:
spark.sql("""
    SELECT * FROM  
    nessie.reddit.daily_message_withpartitions.tag_22_OCT_REPORT
    """).show()

+----------+-----+
|      date|count|
+----------+-----+
|2024-10-13| 4550|
|2024-10-16| 7200|
|2024-10-17| 7200|
|2024-10-14| 7200|
|2024-10-15| 7200|
|2024-10-20| 1124|
|2024-10-18| 7200|
|2024-10-19| 6993|
+----------+-----+



## Time Travel

In [51]:
spark.sql(f"SELECT * FROM {CATALOG_NAME}.{DB_NAME}.daily_message_withpartitions.history;").show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2024-10-20 16:35:...|7759802095667761594|               NULL|               true|
|2024-10-22 22:16:...|4048158043975537361|7759802095667761594|               true|
+--------------------+-------------------+-------------------+-------------------+



## Snapshoots

In [52]:
spark.sql(f"SELECT * FROM {CATALOG_NAME}.{DB_NAME}.daily_message_withpartitions.snapshots;").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-10-20 16:35:...|7759802095667761594|               NULL|   append|s3a://warehouse/r...|{spark.app.id -> ...|
|2024-10-22 22:16:...|4048158043975537361|7759802095667761594|   append|s3a://warehouse/r...|{spark.app.id -> ...|
|2024-10-22 22:31:...|7580483639421632325|7759802095667761594|   append|s3a://warehouse/r...|{spark.app.id -> ...|
|2024-10-22 22:31:...|2404012265844688689|7580483639421632325|   append|s3a://warehouse/r...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [53]:
spark.sql(f"SELECT * FROM {CATALOG_NAME}.{DB_NAME}.daily_message_withpartitions VERSION AS OF 2404012265844688689;").show()

+----------+-----+
|      date|count|
+----------+-----+
|2024-10-13| 4550|
|2024-10-13| 4550|
|2024-10-16| 7200|
|2024-10-16| 7200|
|2024-10-17| 7200|
|2024-10-17| 7200|
|2024-10-14| 7200|
|2024-10-14| 7200|
|2024-10-15| 7200|
|2024-10-15| 7200|
|2024-10-20| 1192|
|2024-10-20| 1124|
|2024-10-21| 1200|
|2024-10-18| 7200|
|2024-10-18| 7200|
|2024-10-19| 6993|
|2024-10-19| 6993|
|2024-10-22| 1200|
|2024-10-13| 4550|
|2024-10-16| 7200|
+----------+-----+
only showing top 20 rows



In [34]:
#spark.stop()