In [None]:
%spark.pyspark


In [1]:
%spark.pyspark
df = spark.read.csv("s3://rumors3/bist_live_daily/bist_live_2024-04-01.csv",header=True)

In [2]:
%spark.pyspark
df.show()


In [3]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit,regexp_replace, col
from datetime import datetime, timedelta
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql import Row
from pyspark.sql.functions import col,expr,format_number
from pyspark.sql.functions import col, lit, min

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()


df1 = spark.read.csv("s3://rumors3/bist_live_daily/bist_live_2024-03-28.csv", header=True, inferSchema=True)
df1 = df1.withColumn('18:00', df1['17:00'])

df2 = spark.read.csv("s3://rumors3/bist_live_daily/bist_live_2024-03-27.csv", header=True, inferSchema=True)
df2 = df2.withColumn('17:00', df2['16:00'])
df2 = df2.withColumn('18:00', df2['16:00'])

df1 = df1.withColumn('Date', lit('2024-03-28'))
df2 = df2.withColumn('Date', lit('2024-03-27'))

df3 = spark.read.csv("s3://rumors3/bist_live_daily/bist_live_2024-04-01.csv", header=True, inferSchema=True)
df3 = df3.withColumn('Date', lit('2024-04-01'))

df4 = spark.read.csv("s3://rumors3/bist_live_daily/bist_live_2024-03-29.csv", header=True, inferSchema=True)
df4 = df4.withColumn('Date', lit('2024-03-29'))

union_df = df1.union(df2).union(df3).union(df4)

melted_df = union_df.selectExpr("stock_name", "Date","stack(8, '10:00', `10:00`, '11:00', `11:00`, '12:00', `12:00`, '13:00', `13:00`, '14:00', `14:00`, '15:00', `15:00`, '16:00', `16:00`, '17:00', `17:00`) as (hour, value)")

stock_stats_df = melted_df.groupBy("stock_name","Date").agg(
    expr("min(value) as min_value"),
    expr("max(value) as max_value"),
    format_number(expr("avg(value)"), 2).alias("avg_value"),
    format_number(expr("stddev(value)"), 2).alias("std_value")
)

stock_stats_df.show()
union_df.show()

output_path = 's3://rumors3/union_df.parquet'
output_path2 = 's3://rumors3/stock_stats_df.parquet'
df_single_partition = union_df.coalesce(1)
df_single_partition.write.parquet(output_path)

stock_stats_df_single = stock_stats_df.coalesce(1)
stock_stats_df_single.write.parquet(output_path2)

# Stop SparkSession
spark.stop()

In [4]:
%spark.pyspark
df1 = spark.read.csv("s3://rumors3/bist_live_daily/bist_live_2024-03-28.csv", header=True, inferSchema=True)

df1.show()


In [5]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit,regexp_replace, col
from datetime import datetime, timedelta
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Read JSON Data") \
    .getOrCreate()

# Calculate the date for yesterday
todate = datetime.today().date()

days = ['2024-04-03,'2024-04-02','2024-04-01','2024-03-29','2024-03-28','2024-03-27','2024-03-26']

spark = SparkSession.builder \
    .appName("Empty DataFrame Creation") \
    .getOrCreate()

schema_analysis = StructType([
    StructField("comment", StringType(), True),
    StructField("date", DateType(), True),
    StructField("disc_index", IntegerType(), True),
    StructField("ticker", StringType(), True)
])

analysis_df = spark.createDataFrame([], schema_analysis)


for i in days:
    analysis_path = f"s3://rumors3/KAP/KAP_analysis/analysis_results_{i}.json"

    df = spark.read \
        .option("multiline", "true") \
        .json(analysis_path)
    analysis_df = analysis_df.union(df)

schema_statement = StructType([
    StructField("company_name", StringType(), True),
    StructField("disc_index", IntegerType(), True),
    StructField("text", StringType(), True)
])

statement_df = spark.createDataFrame([], schema_statement)
for i in days:
    statement_path = f"s3://rumors3/KAP/KAP_statements/statement_{i}.json"

    df = spark.read \
        .option("multiline", "true") \
        .json(statement_path)
    statement_df = statement_df.union(df)
    
analysis_df_alias = analysis_df.alias("analysis")
statement_df_alias = statement_df.alias("statement")


joined_df = analysis_df_alias.join(statement_df_alias, col("analysis.disc_index") == col("statement.disc_index"), "inner")
joined_df = joined_df.drop(col("statement.disc_index"))
joined_df = joined_df.drop('ticker')



rdd = joined_df.rdd

def replace_newline(row):
    row_dict = row.asDict()
    row_dict['comment'] = row_dict['comment'].replace("\n", "")
    return row_dict

# Apply the function to each row of the RDD
new_rdd = rdd.map(replace_newline)

new_rdd2 = new_rdd.map(lambda x: ((x['company_name'], x['date']), 1)) \
                 .reduceByKey(lambda x, y: x + y) \
                 .map(lambda x: (x[0][0], x[0][1], x[1]))

count_df = spark.createDataFrame(new_rdd2, schema=['company_name','date','count'])

# Convert the resulting RDD back to a DataFrame
new_df = spark.createDataFrame(new_rdd, schema=joined_df.schema)

# Show the DataFrame
new_df.show()
count_df.show()

output_path = 's3://rumors3/kap_analysis.parquet'
output_path2 = 's3://rumors3/kap_count.parquet'
df_single_partition = new_df.coalesce(1)
df_single_partition.write.parquet(output_path)

df_single_partition2 = count_df.coalesce(1)
df_single_partition2.write.parquet(output_path2)


# Stop SparkSession
spark.stop()

In [6]:
%spark.pyspark

output_path = 's3://rumors3/kap_analysis.parquet'
output_path2 = 's3://rumors3/kap_count.parquet'
df_single_partition = new_df.coalesce(1)
df_single_partition.write.parquet(output_path)

df_single_partition2 = count_df.coalesce(1)
df_single_partition2.write.parquet(output_path2)

In [7]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, regexp_replace, col
from datetime import datetime, timedelta
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Read Comments Data") \
    .getOrCreate()

# Calculate the date for yesterday
todate = datetime.today().date()


days = ['2024-04-03,'2024-04-02','2024-04-01','2024-03-29','2024-03-28','2024-03-27','2024-03-26']

schema_comments = StructType([
    StructField("Stock", StringType(), True),
    StructField("User", StringType(), True),
    StructField("Comment", StringType(), True),
    StructField("Comment Time", StringType(), True),  # Change StringType to DateType
    StructField("Date", DateType(), True)
])

comments_df = spark.createDataFrame([], schema_comments)

for i in days:
    comments_path = f"s3://rumors3/bist_comments/bist_comments_{i}.json"

    df = spark.read \
        .option("multiline", "true") \
        .schema(schema_comments) \
        .json(comments_path)
    comments_df = comments_df.union(df)

comments_df = comments_df.drop('Comment Time', 'User')

rdd = comments_df.rdd.map(lambda x: ((x['Stock'], x['Date']), 1)) \
                     .reduceByKey(lambda x, y: x + y) \
                     .map(lambda x: (x[0][0], x[0][1], x[1]))


rdd2 = comments_df.rdd.map(lambda x: (x['Comment']))

top_comments = rdd.filter(lambda x: x[1] == todate).sortBy(lambda x: x[2],ascending = False)
top_5_elements = top_comments.take(5)

# Extracting comments
comments_rdd = comments_df.rdd.map(lambda x: x['Comment'].lower().split())

# Filter out stop words
turkish_stop_words = ['acaba', 'ama', 'aslında', 'az', 'bazı', 'belki', 'biri', 'birkaç', 'birşey', 'biz', 'bu',
                      'çok', 'çünkü', 'da', 'daha', 'de', 'defa', 'diye', 'eğer', 'en', 'gibi', 'hem', 'hep',
                      'hepsi', 'her', 'hiç', 'için', 'ile', 'ise', 'kez', 'ki', 'kim', 'mı', 'mu', 'mü', 'nasıl',
                      'ne', 'neden', 'nerde', 'nerede', 'nereye', 'niçin', 'niye', 'o', 'sanki', 'şey', 'siz', 'şu',
                      'tüm', 've', 'veya', 'ya', 'yani','bir','var','cok','mi','kadar','bi','1','2','3','e','a','5','10']
filtered_comments_rdd = comments_rdd.map(lambda words: [word for word in words if word not in turkish_stop_words])

# Count word occurrences
word_counts = filtered_comments_rdd.flatMap(lambda words: [(word, 1) for word in words]) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False) \
    .collect()


# Convert RDD to DataFrame
count_df = spark.createDataFrame(rdd, ['Stock', 'Date', 'CommentCount']).orderBy('Date','CommentCount',ascending=False)
top_5_elements_df = spark.createDataFrame(top_5_elements, ['Stock', 'Date', 'CommentCount'])
word_counts_df = spark.createDataFrame(word_counts, ['Word', 'Count'])

top_5_elements_df.show()
count_df.show()
word_counts_df.show()

output_path = 's3://rumors3/comments_top_5.parquet'
output_path2 = 's3://rumors3/comments_count.parquet'
output_path3 = 's3://rumors3/comments_word_count.parquet'

df_single_partition = top_5_elements_df.coalesce(1)
df_single_partition.write.parquet(output_path)

df_single_partition2 = count_df.coalesce(1)
df_single_partition2.write.parquet(output_path2)

df_single_partition3 = word_counts_df.coalesce(1)
df_single_partition3.write.parquet(output_path3)

spark.sparkContext.stop()


In [8]:
%spark.pyspark
