In [None]:
import os
import findspark

In [None]:
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, format_number

In [None]:
spark = SparkSession.builder \
    .appName("UsStock") \
    .getOrCreate()

In [None]:
import os
root_path = os.getcwd()
# print(root_path)
us_stock_overview_csv = 'history\\20250525154117.csv'
us_stock_overview_csv_path = os.path.join(root_path, us_stock_overview_csv)
# print(us_stock_overview_csv_path)

In [None]:
#join chinese/english industry
chi_stock_overview_csv = 'history\\20250525163420.csv'
chi_stock_overview_csv_path = os.path.join(root_path, chi_stock_overview_csv)
schema = StructType([
    StructField("No", IntegerType(), True),
    StructField("Symbol", StringType(), True),
    StructField("Chi_Industry", StringType(), True)
])
chinese_df = spark.read.csv(chi_stock_overview_csv_path, header=True, schema=schema)
chinese_df.printSchema()

In [None]:
original_df = spark.read.csv(us_stock_overview_csv_path, header=True, inferSchema=True)
original_df = original_df.join(chinese_df.select("Symbol","Chi_industry"), on='Symbol',how ='inner')

In [None]:
original_df.printSchema()

In [None]:
columns = ["Symbol","% Chg", "Market Cap", "Industry", "Chi_Industry"]
filtered_df = original_df.filter((F.col("Market Cap") > 100000000) & (F.trim(F.col("Industry")) != '-'))
filtered_df = filtered_df.select(columns)
filtered_df.printSchema()
# filtered_df.show(2)

In [None]:
def percentage_to_float(percentage_str):
    return (float(percentage_str.strip('%')) / 100)

percentage_udf = udf(percentage_to_float, FloatType())
filtered_df_transformed = filtered_df.withColumn("% Chg", percentage_udf(filtered_df["% Chg"]))

filtered_df_transformed.printSchema()
# filtered_df_transformed.show()

In [None]:
avg_industry_df = filtered_df_transformed.select("Industry","Chi_Industry","% Chg","Symbol")\
.groupBy("Industry","Chi_Industry")\
.agg(F.round(F.avg("% Chg"),4).alias("Total Industry Change Rate"), F.count("Industry").alias("Total Stock"), F.max("% Chg").alias("Best Contribution"))\
.orderBy(F.col("Total Industry Change Rate").desc())
 
avg_industry_df.show(avg_industry_df.count())

In [None]:
filtered_df_with_column_transformed = filtered_df_transformed.select("Symbol","Industry","% Chg")
df_as1 = avg_industry_df.alias("df_as1")
df_as2 = filtered_df_with_column_transformed.alias("df_as2")
avg_industry_with_best_contribution_df = df_as1.join(df_as2, (F.col("df_as1.Best Contribution") == F.col("df_as2.% Chg"))\
                                                     & (F.col("df_as1.Industry") == F.col("df_as2.Industry")))\
                                                     .select(df_as1["*"],"df_as2.Symbol")\
                                                     .orderBy(F.col("Total Industry Change Rate").desc())

avg_industry_with_best_contribution_df = avg_industry_with_best_contribution_df.withColumnRenamed("Symbol", "Best Stock")
avg_industry_with_best_contribution_df.show(avg_industry_with_best_contribution_df.count())