In [34]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

from pyspark.sql.types import StringType
from pyspark.sql import functions as F
import pyspark.sql.types as T
from pyspark.sql.types import StructType, StructField, StringType
from textblob import TextBlob

In [4]:
# credentials_location = '/home/faisal/my_projects/de-reddit-reports/keys/de-reddit-reports-f9479aba34a3.json'
credentials_location = '/home/faisal/.gc/ny-rides.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "../lib/gcs-connector-hadoop3-2.2.5.jar, ../lib/spark-3.5-bigquery-0.36.1.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) 

In [5]:
default_project_id = conf.get("spark.hadoop.google.cloud.project.id", "No default project configured")
print("Default Project ID:", default_project_id)

Default Project ID: No default project configured


In [6]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/03/07 20:52:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [8]:
spark.conf.set('temporaryGcsBucket', 'dataproc-staging-na-northeast1-667281304179-9us80iur')

In [16]:
# df_post = (spark.read
#            .parquet('gs://reddit-terra-bucket/subreddit/dataengineering/staging/post_comment/2024/2/24/post_comment.parquet')
#           )

In [35]:
schema = StructType([
    StructField("title", StringType(), True),
    StructField("score", StringType(), True),
    StructField("num_comments", StringType(), True),
    StructField("post_author", StringType(), True),
    StructField("post_created_utc", StringType(), True),
    StructField("url", StringType(), True),
    StructField("upvote_ratio", StringType(), True),
    StructField("over_18", StringType(), True),
    StructField("edited", StringType(), True),
    StructField("spoiler", StringType(), True),
    StructField("stickied", StringType(), True),
    StructField("selftext", StringType(), True),
    StructField("post_sentiment_score", StringType(), True),
    StructField("post_sentiment_label", StringType(), True),
    StructField("comment_id", StringType(), True),
    StructField("comment_body", StringType(), True),
    StructField("comment_author", StringType(), True),
    StructField("comment_created_utc", StringType(), True),
    StructField("comment_score", StringType(), True),
    StructField("comment_edited", StringType(), True),
    StructField("comment_stickied", StringType(), True),
    StructField("comment_parent_id", StringType(), True),
    StructField("comment_num_replies", StringType(), True),
    StructField("permalink", StringType(), True),
    StructField("comment_sentiment_score", StringType(), True),
    StructField("comment_sentiment_label", StringType(), True),
    StructField("post_id", StringType(), True)
])

# Sample data
data = [("Sample Title", "10", "5", "Author", "2024-03-07", "https://example.com", "0.8", "false", "false", "false", "false", "Sample selftext", "0.5", "positive", "krtngi5", "Sample comment", "Comment Author", "2024-03-07", "5", "false", "false", "1ay98md", "2", "https://example.com/permalink", "0.7", "positive", "1ay98md"),
        ("Sample Title", "10", "5", "Author", "2024-03-07", "https://example.com", "0.8", "false", "false", "false", "false", "Sample selftext", "0.5", "positive", "krtjxjq", "Sample comment", "Comment Author", "2024-03-07", "5", "false", "false", "1ay98md", "999", "https://example.com/permalink", "0.7", "positive", "1ay98md")
       ]

# Create DataFrame
df_post = spark.createDataFrame(data, schema=schema)

In [36]:
df_post.where(df_post.comment_id=='krtngi5').show(3)

                                                                                

+------------+-----+------------+-----------+----------------+-------------------+------------+-------+------+-------+--------+---------------+--------------------+--------------------+----------+--------------+--------------+-------------------+-------------+--------------+----------------+-----------------+-------------------+--------------------+-----------------------+-----------------------+-------+
|       title|score|num_comments|post_author|post_created_utc|                url|upvote_ratio|over_18|edited|spoiler|stickied|       selftext|post_sentiment_score|post_sentiment_label|comment_id|  comment_body|comment_author|comment_created_utc|comment_score|comment_edited|comment_stickied|comment_parent_id|comment_num_replies|           permalink|comment_sentiment_score|comment_sentiment_label|post_id|
+------------+-----+------------+-----------+----------------+-------------------+------------+-------+------+-------+--------+---------------+--------------------+--------------------

In [37]:
df_post.count()
# df_post.printSchema()

2

In [38]:
(
        df_post
        .write
        .format('bigquery')
        .option('table', 'de-reddit-reports.reddit_dataset.dataengineering_staging')
        .mode("overwrite")
        .save()
)

                                                                                