In [120]:
from pyspark.sql import SparkSession

# SparkSession 객체 생성
spark = SparkSession.builder \
    .appName("stock") \
    .getOrCreate()



# pandas 버전
# pandas_df = pd.read_csv('titanic_train.csv', header='infer')

#spark.read.csv() 메소드를 이용하여 csv 파일을 로드하고 DataFrame으로 변환. 
# CSV 파일을 pandas 데이터프레임으로 불러올 때, 'NULL' 문자열을 NaN 값으로 대체하도록 설정
stock_sdf = spark.read.csv('./data/stock_info_0315.csv', header=True, inferSchema=True,  nullValue='NULL')
print('stock sdf type:', type(stock_sdf))

stock_sdf.show(5)

# spark DataFrame을 메모리에 cache
stock_sdf = stock_sdf.cache()



# pandas 데이터 불러오기 
import pandas as pd

na_values = ['NULL']
stock_pdf = pd.read_csv('./data/stock_info_0315.csv', header='infer', na_values=na_values)


stock sdf type: <class 'pyspark.sql.dataframe.DataFrame'>
+--------+------------+-----+------+------------+----------+------+-----+-------+-------+--------+----+----------+----------+--------------+------------+--------------+----------------+-----+-----+-----+------+----------+------------+--------+--------+--------------+--------+
|종목코드|      종목명| 종가|등락률|    시가총액|    기준일|   eps|  per|선행eps|선행per|     bps| pbr|주당배당금|배당수익률|외국인보유수량|외국인지분율|외국인한도수량|외국인한도소진율| 시가| 고가| 저가|거래량|  거래대금|기업고유번호|시장구분|종목구분|          섹터|  업종명|
+--------+------------+-----+------+------------+----------+------+-----+-------+-------+--------+----+----------+----------+--------------+------------+--------------+----------------+-----+-----+-----+------+----------+------------+--------+--------+--------------+--------+
|  000020|    동화약품| 9350| -0.85|261159244500|2024-03-15| 736.0| 12.7|   null|   null| 13165.0|0.71|     180.0|      1.93|       1681856|        6.02|      27931470|            6.02| 9450| 9450| 9300| 7220

In [121]:
# SQL문 사용시 한글 컬럼을 조건에 넣으면 오류가 발생하기 때문에 컬럼명을 영문으로 변경해야 함 
# 컬럼명 변경
stock_pdf = stock_pdf.rename(columns={
    '종목코드': 'stock_code',
    '종목명': 'stock_name',
    '종가': 'closing_price',
    '등락률': 'price_change',
    '시가총액': 'market_cap',
    '기준일': 'base_date',
    '선행eps': 'leading_eps',
    '선행per': 'leading_per',
    '주당배당금': 'dividend_per_share',
    '배당수익률': 'dividend_yield',
    '외국인보유수량': 'foreign_ownership_quantity',
    '외국인지분율': 'foreign_ownership_ratio',
    '외국인한도수량': 'foreign_limit_quantity',
    '외국인한도소진율': 'foreign_limit_exhaustion_ratio',
    '시가': 'opening_price',
    '고가': 'high_price',
    '저가': 'low_price',
    '거래량': 'trading_volume',
    '거래대금': 'trading_value',
    '기업고유번호': 'company_id',
    '시장구분': 'market_division',
    '종목구분': 'stock_division',
    '섹터': 'sector',
    '업종명': 'industry_name',
})

# spark DataFrame으로 변환 
stock_sdf = spark.createDataFrame(stock_pdf)

In [128]:
from pyspark.sql.functions import isnan

# nan을 null로 변경
stock_sdf = stock_sdf.replace(float('nan'), None)
stock_sdf = stock_sdf.replace('NaN', None)

In [129]:
## SQL 사용이 가능할 수 있도록 stock_sdf DataFrame을 DB View로 생성. 
stock_sdf.createOrReplaceTempView('stock_view')

In [130]:
spark.sql('select *  from stock_view a').limit(5).show()

+----------+------------+-------------+------------+------------+----------+------+-----+-----------+-----------+--------+----+------------------+--------------+--------------------------+-----------------------+----------------------+------------------------------+-------------+----------+---------+--------------+-------------+----------+---------------+--------------+--------------+-------------+
|stock_code|  stock_name|closing_price|price_change|  market_cap| base_date|   eps|  per|leading_eps|leading_per|     bps| pbr|dividend_per_share|dividend_yield|foreign_ownership_quantity|foreign_ownership_ratio|foreign_limit_quantity|foreign_limit_exhaustion_ratio|opening_price|high_price|low_price|trading_volume|trading_value|company_id|market_division|stock_division|        sector|industry_name|
+----------+------------+-------------+------------+------------+----------+------+-----+-----------+-----------+--------+----+------------------+--------------+--------------------------+--------

In [9]:
stock_sdf.select('*').limit(10).show()

+--------+--------------+------+------+-------------+----------+------+-----+-------+-------+--------+----+----------+----------+--------------+------------+--------------+----------------+------+------+------+------+-----------+------------+--------+--------+--------------+----------+
|종목코드|        종목명|  종가|등락률|     시가총액|    기준일|   eps|  per|선행eps|선행per|     bps| pbr|주당배당금|배당수익률|외국인보유수량|외국인지분율|외국인한도수량|외국인한도소진율|  시가|  고가|  저가|거래량|   거래대금|기업고유번호|시장구분|종목구분|          섹터|    업종명|
+--------+--------------+------+------+-------------+----------+------+-----+-------+-------+--------+----+----------+----------+--------------+------------+--------------+----------------+------+------+------+------+-----------+------------+--------+--------+--------------+----------+
|  000020|      동화약품|  9350| -0.85| 261159244500|2024-03-15| 736.0| 12.7|   null|   null| 13165.0|0.71|     180.0|      1.93|       1681856|        6.02|      27931470|            6.02|  9450|  9450|  9300| 72206|  674278570|      1

In [51]:

spark.sql('select upper(stock_name), closing_price from stock_view').show(20)


+------------------+-------------+
| upper(stock_name)|closing_price|
+------------------+-------------+
|          동화약품|         9350|
|          KR모터스|          465|
|              경방|         8470|
|        삼양홀딩스|        71300|
|      삼양홀딩스우|        54900|
|        하이트진로|        20300|
|    하이트진로2우B|        15710|
|          유한양행|        74000|
|        유한양행우|        63400|
|        CJ대한통운|       124600|
|  하이트진로홀딩스|         9000|
|하이트진로홀딩스우|        12000|
|              두산|       145000|
|            두산우|        64900|
|          두산2우B|        78600|
|      성창기업지주|         2025|
|                DL|        44850|
|              DL우|        24650|
|          유유제약|         4900|
|       유유제약1우|         5360|
+------------------+-------------+
only showing top 20 rows



In [53]:
import pyspark.sql.functions as F

stock_sdf.select(F.upper(F.col('stock_name')), F.col('closing_price')).show(20)

+------------------+-------------+
| upper(stock_name)|closing_price|
+------------------+-------------+
|          동화약품|         9350|
|          KR모터스|          465|
|              경방|         8470|
|        삼양홀딩스|        71300|
|      삼양홀딩스우|        54900|
|        하이트진로|        20300|
|    하이트진로2우B|        15710|
|          유한양행|        74000|
|        유한양행우|        63400|
|        CJ대한통운|       124600|
|  하이트진로홀딩스|         9000|
|하이트진로홀딩스우|        12000|
|              두산|       145000|
|            두산우|        64900|
|          두산2우B|        78600|
|      성창기업지주|         2025|
|                DL|        44850|
|              DL우|        24650|
|          유유제약|         4900|
|       유유제약1우|         5360|
+------------------+-------------+
only showing top 20 rows



In [57]:
# - stock_division의 1, 2번째 문자만 가져오는 cap_stock_division 컬럼 추가 substring() SQL 함수 활용

sql_script = 'select a.stock_code, a.stock_name, substring(stock_division, 0, 2) as cap_stock_division from stock_view a'

spark.sql(sql_script).limit(5).show()


+----------+------------+------------------+
|stock_code|  stock_name|cap_stock_division|
+----------+------------+------------------+
|    000020|    동화약품|              보통|
|    000040|    KR모터스|              보통|
|    000050|        경방|              보통|
|    000070|  삼양홀딩스|              보통|
|    000075|삼양홀딩스우|              우선|
+----------+------------+------------------+



In [58]:
stock_sdf.select('stock_code','stock_name', F.substring(F.col('stock_division'), 0, 2).alias('cap_stock_division')).limit(5).show()

+----------+------------+------------------+
|stock_code|  stock_name|cap_stock_division|
+----------+------------+------------------+
|    000020|    동화약품|              보통|
|    000040|    KR모터스|              보통|
|    000050|        경방|              보통|
|    000070|  삼양홀딩스|              보통|
|    000075|삼양홀딩스우|              우선|
+----------+------------+------------------+



In [70]:

sql_script = 'select stock_name, per, market_cap from stock_view where per > 100 and market_cap > 10000000000000'

spark.sql(sql_script).limit(5).show()


+--------------+------+--------------+
|    stock_name|   per|    market_cap|
+--------------+------+--------------+
|  포스코퓨처엠|207.27|24517109130000|
|      에코프로|419.99|16003228468000|
|LG에너지솔루션|120.54|93249000000000|
|  에코프로머티|510.88|10361579743600|
+--------------+------+--------------+



In [69]:
stock_sdf.filter((F.col('per')> 100) & (F.col('market_cap') > 10000000000000)).select('stock_name', 'per', 'market_cap').show(5)

+--------------+------+--------------+
|    stock_name|   per|    market_cap|
+--------------+------+--------------+
|  포스코퓨처엠|207.27|24517109130000|
|      에코프로|419.99|16003228468000|
|LG에너지솔루션|120.54|93249000000000|
|  에코프로머티|510.88|10361579743600|
+--------------+------+--------------+



In [73]:

sql_script = "select stock_name, sector, closing_price from stock_view where stock_name like '%삼성%'" 
spark.sql(sql_script).limit(5).show()

+----------+--------+-------------+
|stock_name|  sector|closing_price|
+----------+--------+-------------+
|  삼성화재|     NaN|       309500|
|삼성화재우|     NaN|       240000|
|  삼성제약|건강관리|         1805|
|  삼성전자|      IT|        72300|
|삼성전자우|     NaN|        62000|
+----------+--------+-------------+



In [78]:
stock_sdf.filter(F.col('stock_name').like("%삼성%")).select('stock_name', 'sector', 'closing_price').show(5)

+----------+--------+-------------+
|stock_name|  sector|closing_price|
+----------+--------+-------------+
|  삼성화재|     NaN|       309500|
|삼성화재우|     NaN|       240000|
|  삼성제약|건강관리|         1805|
|  삼성전자|      IT|        72300|
|삼성전자우|     NaN|        62000|
+----------+--------+-------------+
only showing top 5 rows



In [81]:
# order by 절로 정렬. market_cap로 내림차순 정렬 후 10건만 추출. 
    
sql_script = "select stock_name, market_cap from stock_view order by market_cap desc limit 10"

spark.sql(sql_script).show()

+----------------+---------------+
|      stock_name|     market_cap|
+----------------+---------------+
|        삼성전자|431615278365000|
|      SK하이닉스|117353981238000|
|  LG에너지솔루션| 93249000000000|
|삼성바이오로직스| 58789724000000|
|          현대차| 51507921711000|
|      삼성전자우| 51018975400000|
|            기아| 50255525375000|
|        셀트리온| 39423396969600|
|     POSCO홀딩스| 36788485050000|
|          KB금융| 30747543686400|
+----------------+---------------+



In [84]:
stock_sdf.select('stock_name', 'market_cap').orderBy('market_cap', ascending=False).limit(10).show()

+----------------+---------------+
|      stock_name|     market_cap|
+----------------+---------------+
|        삼성전자|431615278365000|
|      SK하이닉스|117353981238000|
|  LG에너지솔루션| 93249000000000|
|삼성바이오로직스| 58789724000000|
|          현대차| 51507921711000|
|      삼성전자우| 51018975400000|
|            기아| 50255525375000|
|        셀트리온| 39423396969600|
|     POSCO홀딩스| 36788485050000|
|          KB금융| 30747543686400|
+----------------+---------------+



In [86]:
# view의 건수 추출 
sql_script = 'select count(*) as cnt from stock_view'

spark.sql(sql_script).show()

+----+
| cnt|
+----+
|2671|
+----+



In [89]:
stock_sdf.count()

2671

In [91]:
# stock_view의 최대 market_cap, 최소 closing_price, 평균 per 값을 추출. 
sql_script = 'select max(market_cap), min(closing_price), avg(per) from stock_view'

spark.sql(sql_script).show()

+---------------+------------------+-----------------+
|max(market_cap)|min(closing_price)|         avg(per)|
+---------------+------------------+-----------------+
|431615278365000|                77|51.47689375000001|
+---------------+------------------+-----------------+



In [93]:
stock_sdf.select(F.max(F.col("market_cap")), F.min(F.col("closing_price")), F.avg(F.col("per"))).show()

+---------------+------------------+-----------------+
|max(market_cap)|min(closing_price)|         avg(per)|
+---------------+------------------+-----------------+
|431615278365000|                77|51.47689375000001|
+---------------+------------------+-----------------+



In [96]:
# stock_division으로 group by 하여 stock_division 레벨로 최대 market_cap, 최소 closing_price, 평균 per 값을 추출
sql_script = 'select stock_division, max(market_cap) as max_market_cap, min(closing_price) as min_closing_price, avg(per) as avg_per from stock_view group by stock_division'

spark.sql(sql_script).show()

+--------------+---------------+-----------------+-----------------+
|stock_division| max_market_cap|min_closing_price|          avg_per|
+--------------+---------------+-----------------+-----------------+
|        보통주|431615278365000|              171|51.47689375000001|
|        우선주| 51018975400000|             1464|             null|
|          기타|  5599790411750|               77|             null|
|          스팩|    73651200000|             1976|             null|
+--------------+---------------+-----------------+-----------------+



In [107]:
# stock_sdf.groupBy('stock_division').agg(F.max(F.col('market_cap')).alias('max_market_cap'), 
#                                   F.min(F.col('closing_price')).alias('min_closing_price'), 
#                                   F.avg(F.col('per')).alias('avg_per')).show()

+--------------+---------------+-----------------+-----------------+
|stock_division| max_market_cap|min_closing_price|          avg_per|
+--------------+---------------+-----------------+-----------------+
|        보통주|431615278365000|              171|51.47689375000001|
|        우선주| 51018975400000|             1464|             null|
|          기타|  5599790411750|               77|             null|
|          스팩|    73651200000|             1976|             null|
+--------------+---------------+-----------------+-----------------+



In [108]:
# stock_division으로 group by 하여 stock_division 레벨로 최대 market_cap, 최소 closing_price, 평균 per 값을 추출
sql_script = 'select stock_division, max(market_cap) as max_market_cap, min(closing_price) as min_closing_price, avg(per) as avg_per from stock_view group by stock_division having min_closing_price > 100'

spark.sql(sql_script).show()

+--------------+---------------+-----------------+-----------------+
|stock_division| max_market_cap|min_closing_price|          avg_per|
+--------------+---------------+-----------------+-----------------+
|        보통주|431615278365000|              171|51.47689375000001|
|        우선주| 51018975400000|             1464|             null|
|          스팩|    73651200000|             1976|             null|
+--------------+---------------+-----------------+-----------------+



In [109]:
# stock_sdf.groupBy('stock_division').agg(F.max(F.col('market_cap')).alias('max_market_cap'), 
#                                   F.min(F.col('closing_price')).alias('min_closing_price'), 
#                                   F.avg(F.col('per')).alias('avg_per')).filter(F.col("min_closing_price") > 100).show()

+--------------+---------------+-----------------+-----------------+
|stock_division| max_market_cap|min_closing_price|          avg_per|
+--------------+---------------+-----------------+-----------------+
|        보통주|431615278365000|              171|51.47689375000001|
|        우선주| 51018975400000|             1464|             null|
|          스팩|    73651200000|             1976|             null|
+--------------+---------------+-----------------+-----------------+



In [None]:
%sql
-- 아래 update sql은 동작하지 않음. 

update titanic_view set fare=fare*10 -- titanic_sdf.withColumn('Fare', col('Fare')*10)

In [None]:
%sql

delete from titanic_view;

In [132]:
# null, not null 조회
sql_script = 'select stock_name, sector, per, closing_price from stock_view where per is not null and sector is null'

spark.sql(sql_script).show()

+------------+------+-----+-------------+
|  stock_name|sector|  per|closing_price|
+------------+------+-----+-------------+
|한화손해보험|  null| 2.66|         4945|
|    흥국화재|  null| 2.16|         4625|
|    삼성화재|  null|10.28|       309500|
|    만호제강|  null|15.18|        47150|
|유진투자증권|  null| 25.0|         4200|
|    부국증권|  null| 5.55|        26300|
|  상상인증권|  null|22.14|          775|
|    현대해상|  null| 4.86|        34500|
|  현대차증권|  null|  3.6|         9550|
|      SK증권|  null|26.96|          620|
|    신영증권|  null| 5.78|        68800|
|    한양증권|  null| 5.79|        10760|
|    유화증권|  null|34.03|         2280|
|  유안타증권|  null|13.06|         2730|
|    대신증권|  null|10.24|        17920|
|    코리안리|  null| 8.62|         8320|
|  DB손해보험|  null| 6.22|       101900|
|  NH투자증권|  null|14.63|        12890|
|    제주은행|  null| 19.5|        12400|
|미래에셋증권|  null| 7.76|         8020|
+------------+------+-----+-------------+
only showing top 20 rows



In [135]:
stock_sdf.filter((F.col('per').isNotNull()) & (F.col('sector').isNull())).select('stock_name', 'sector', 'per', 'closing_price').show()

+------------+------+-----+-------------+
|  stock_name|sector|  per|closing_price|
+------------+------+-----+-------------+
|한화손해보험|  null| 2.66|         4945|
|    흥국화재|  null| 2.16|         4625|
|    삼성화재|  null|10.28|       309500|
|    만호제강|  null|15.18|        47150|
|유진투자증권|  null| 25.0|         4200|
|    부국증권|  null| 5.55|        26300|
|  상상인증권|  null|22.14|          775|
|    현대해상|  null| 4.86|        34500|
|  현대차증권|  null|  3.6|         9550|
|      SK증권|  null|26.96|          620|
|    신영증권|  null| 5.78|        68800|
|    한양증권|  null| 5.79|        10760|
|    유화증권|  null|34.03|         2280|
|  유안타증권|  null|13.06|         2730|
|    대신증권|  null|10.24|        17920|
|    코리안리|  null| 8.62|         8320|
|  DB손해보험|  null| 6.22|       101900|
|  NH투자증권|  null|14.63|        12890|
|    제주은행|  null| 19.5|        12400|
|미래에셋증권|  null| 7.76|         8020|
+------------+------+-----+-------------+
only showing top 20 rows



In [139]:
    
sql_script = ''' Select stock_name, stock_division, market_division, market_cap, CASE WHEN market_cap <= 500000000000 THEN '중소기업'
                 WHEN market_cap < 10000000000000 Then '중견기업'
                 WHEN market_cap is Null THEN 'NA'
                 ELSE '대기업' END as coporate_division from stock_view;
'''

spark.sql(sql_script).show()

+------------------+--------------+---------------+-------------+-----------------+
|        stock_name|stock_division|market_division|   market_cap|coporate_division|
+------------------+--------------+---------------+-------------+-----------------+
|          동화약품|        보통주|          KOSPI| 261159244500|         중소기업|
|          KR모터스|        보통주|          KOSPI|  44704386225|         중소기업|
|              경방|        보통주|          KOSPI| 232207336900|         중소기업|
|        삼양홀딩스|        보통주|          KOSPI| 610632522300|         중견기업|
|      삼양홀딩스우|        우선주|          KOSPI|  16692784200|         중소기업|
|        하이트진로|        보통주|          KOSPI|1423712303300|         중견기업|
|    하이트진로2우B|        우선주|          KOSPI|  17754467980|         중소기업|
|          유한양행|        보통주|          KOSPI|5935470736000|         중견기업|
|        유한양행우|        우선주|          KOSPI|  74871596000|         중소기업|
|        CJ대한통운|        보통주|          KOSPI|2842418062400|         중견기업|
|  하이트진로홀딩스|        보통

In [143]:

stock_sdf.withColumn('coporate_division', F.when(F.col('market_cap') <= 500000000000 , '중소기업')
                                                                      .when(F.col('market_cap') < 10000000000000, '중견기업')
                                                                      .when(F.col('market_cap').isNull(), 'NA')
                                                                      .otherwise('대기업')).select("stock_name", "stock_division", "market_division", "market_cap", "coporate_division").show()


+------------------+--------------+---------------+-------------+-----------------+
|        stock_name|stock_division|market_division|   market_cap|coporate_division|
+------------------+--------------+---------------+-------------+-----------------+
|          동화약품|        보통주|          KOSPI| 261159244500|         중소기업|
|          KR모터스|        보통주|          KOSPI|  44704386225|         중소기업|
|              경방|        보통주|          KOSPI| 232207336900|         중소기업|
|        삼양홀딩스|        보통주|          KOSPI| 610632522300|         중견기업|
|      삼양홀딩스우|        우선주|          KOSPI|  16692784200|         중소기업|
|        하이트진로|        보통주|          KOSPI|1423712303300|         중견기업|
|    하이트진로2우B|        우선주|          KOSPI|  17754467980|         중소기업|
|          유한양행|        보통주|          KOSPI|5935470736000|         중견기업|
|        유한양행우|        우선주|          KOSPI|  74871596000|         중소기업|
|        CJ대한통운|        보통주|          KOSPI|2842418062400|         중견기업|
|  하이트진로홀딩스|        보통

### Databricks의 내장된 Graph 시각화 기능 사용하기. 
* 일반적으로 spark의 경우 spark DataFrame을 pandas Dataframe으로 변환 후 matplotlib 이나 seaborn을 적용하여 시각화
* Spark에서 Pandas API와 매우 유사하게 DataFrame을 사용할 수 있도록 해주는 Koalas 패키지가 있음. Spark 3.2 부터는 Pandas API로 Koalas 패키지가 통합됨. 
* Databricks는 Graph 시각화 기능이 내장되어 있음. 이를 이용하여 손쉽게 시각화 가능. 
* 시각화 기능을 적용할 때에는 클라이언트 레벨에서 너무 많은 데이터를 가공하지 않도록 유의(매우 많은 시간과 리소스 소모)

In [None]:
%sql
--bar chart
select pclass, count(*) from titanic_view group by pclass

In [None]:
## SQL뿐만 아니라 DataFrame도 display() 적용 시 Databricks 시각화 기능 적용 가능. 
display(titanic_sdf.groupBy('Pclass').count())

In [None]:
%sql
-- histogram
select survived from titanic_view

In [None]:
%sql
-- bar chart
select Survived, count(*) from titanic_view group by Survived order by 2 desc

In [None]:
%sql 
-- Sex 별로 생존/사망 건수를 비교하여 보여 주기. 
select Survived, Sex, count(*) from titanic_view group by survived, sex;

In [None]:
%sql
-- pclass와 sex 별로 생존/사망 건수를 비교하여 보여 주기 
select Survived, Sex, Pclass, count(*) cnt from titanic_view group by survived, sex, pclass;

In [None]:
%sql
-- scatter plot
select fare, age from titanic_view

In [None]:
%sql
-- boxplot
select age from titanic_view