<a href="https://colab.research.google.com/github/holictoweb/spark_deep_dive/blob/main/lakewarehouse/pyspark_datawarehouse_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### colab + yfinance + google drive + pyspark

1. pyspark 설치 
2. yfinance를 통해 데이터 수집
3. google drive 상에 parquet 으로 데이터 저장
4. pyspark을 통해 해당 데이터를 테이블로 저장
5. 데이터 분석 진행

### Datasource

[spark 3.1.1 공식 문서 ](https://spark.apache.org/docs/latest/sql-data-sources.html)



In [17]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import lit
 
db_path = '/content/drive/MyDrive/data-warehouse/stocklab_db'

spark = SparkSession.builder.appName('test_spark') \
  .config("spark.sql.warehouse.dir", db_path) \
  .enableHiveSupport() \
  .getOrCreate()

In [18]:
spark.sql("drop database if exists stocklab CASCADE")
#AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: InvalidOperationException(message:Database stocklab is not empty. One or more tables exist.)
#force drop cascade
spark.sql("create database IF NOT EXISTS stocklab")
spark.sql("show databases ").show()
spark.sql("describe database stocklab").show(100,False)

+---------+
|namespace|
+---------+
|  default|
| stocklab|
+---------+

+-------------------------+------------------------------------------------------------------+
|database_description_item|database_description_value                                        |
+-------------------------+------------------------------------------------------------------+
|Database Name            |stocklab                                                          |
|Comment                  |                                                                  |
|Location                 |file:/content/drive/MyDrive/data-warehouse/stocklab_db/stocklab.db|
|Owner                    |root                                                              |
+-------------------------+------------------------------------------------------------------+



In [28]:
import yfinance as yf
 
#data = yf.download("SPY AAPL", start="2017-01-01", end="2017-04-30") #sample code 
 
ticker_list = ["005930.KS", '068270.KS', '035420.KS', '064260.KQ']
 
for ticker in ticker_list:
  # pandas df 로 받은 데이터를 spark dataframe 으로 변경
  pdf = yf.download(ticker, sdate='2020-01-01')
  df = spark.createDataFrame(pdf)  #df.show()
  
  #df.write.format('parquet').save('drive/MyDrive/data-warehouse/test')  # column Adj Close 와 관련한 이슈 발생
  # AnalysisException: Attribute name "Adj Close" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it. 
 
  ticker = ticker.split('.')[0]
  df = df.withColumnRenamed("Adj Close", "AdjClose").withColumn("Code", lit(ticker))
  
  # 1. unmanaged table 구성 
  #    parquet 으로 먼저 다운 받은 데이터를 가지고 테이블을 구성. 테이블 삭제 시 해당 데이터 파일은 삭제 되지 않음
  table_path = '/content/drive/MyDrive/data-warehouse/stock_day/'

  # Overwrite specific partitions in spark dataframe write method
  # df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
  df.write.format('parquet').mode("overwrite").save(table_path + 'Code='+ ticker)
  df.show()
  create_table_sql = 'create table if not exists stocklab.stock_day using org.apache.spark.sql.parquet options (path "'+ table_path +'")'
  #print(create_table_sql)
  spark.sql(create_table_sql)

  # 2. managed table 구성
  # 현재 DB 구성 하위로 테이블을 생성 하며 df에서 db 상에 테이블을 생성하는 방식으로 테이블과 데이터가 함께 관리 되며 삭제도 함께 진행. 
  #df.write.mode("overwrite").partitionBy("Code").saveAsTable("mng_stock_day")
  

[*********************100%***********************]  1 of 1 completed
+------+------+------+------+----------------+--------+------+
|  Open|  High|   Low| Close|        AdjClose|  Volume|  Code|
+------+------+------+------+----------------+--------+------+
|6000.0|6110.0|5660.0|6110.0|4761.18310546875|74195000|005930|
|5800.0|6060.0|5520.0|5580.0|4348.18408203125|74680000|005930|
|5750.0|5780.0|5580.0|5620.0|4379.35400390625|54390000|005930|
|5560.0|5670.0|5360.0|5540.0|     4317.015625|40305000|005930|
|5600.0|5770.0|5580.0|5770.0|4496.24072265625|46880000|005930|
|5820.0|6100.0|5770.0|5770.0|4496.24072265625|59745000|005930|
|5610.0|5740.0|5600.0|5720.0| 4457.2783203125|29220000|005930|
|5600.0|5740.0|5560.0|5710.0|4449.48583984375|41190000|005930|
|5720.0|5880.0|5680.0|5830.0|4542.99560546875|49375000|005930|
|6000.0|6180.0|5920.0|6100.0|4753.39111328125|63505000|005930|
|6160.0|6160.0|5980.0|6100.0|4753.39111328125|45260000|005930|
|6000.0|6040.0|5960.0|5960.0|4644.29736328125|402

In [25]:
spark.sql("drop table stocklab.stock_day")

DataFrame[]

In [36]:
spark.sql ( "select * from stocklab.stock_day").show()
spark.sql("describe table extended stocklab.stock_day").show(truncate=False)


+------+------+------+------+----------------+--------+----+
|  Open|  High|   Low| Close|        AdjClose|  Volume|Code|
+------+------+------+------+----------------+--------+----+
|6000.0|6110.0|5660.0|6110.0|4761.18310546875|74195000|5930|
|5800.0|6060.0|5520.0|5580.0|4348.18408203125|74680000|5930|
|5750.0|5780.0|5580.0|5620.0|4379.35400390625|54390000|5930|
|5560.0|5670.0|5360.0|5540.0|     4317.015625|40305000|5930|
|5600.0|5770.0|5580.0|5770.0|4496.24072265625|46880000|5930|
|5820.0|6100.0|5770.0|5770.0|4496.24072265625|59745000|5930|
|5610.0|5740.0|5600.0|5720.0| 4457.2783203125|29220000|5930|
|5600.0|5740.0|5560.0|5710.0|4449.48583984375|41190000|5930|
|5720.0|5880.0|5680.0|5830.0|4542.99560546875|49375000|5930|
|6000.0|6180.0|5920.0|6100.0|4753.39111328125|63505000|5930|
|6160.0|6160.0|5980.0|6100.0|4753.39111328125|45260000|5930|
|6000.0|6040.0|5960.0|5960.0|4644.29736328125|40205000|5930|
|5860.0|6040.0|5820.0|6040.0|4706.63916015625|37315000|5930|
|5950.0|5980.0|5880.0|58

In [35]:
df = spark.sql("select * from stocklab.stock_day")
df.groupBy("Code").max("Code").show(truncate=False)

+-----+---------+
|Code |max(Code)|
+-----+---------+
|5930 |5930     |
|64260|64260    |
|35420|35420    |
|68270|68270    |
+-----+---------+



## run spark sql direct from file

https://spark.apache.org/docs/2.3.1/sql-programming-guide.html#run-sql-on-files-directly

In [40]:
# df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
spark.sql("select * from parquet.`/content/drive/MyDrive/data-warehouse/stock_day/*`").show(truncate=False)

+------+------+------+------+----------------+--------+------+
|Open  |High  |Low   |Close |AdjClose        |Volume  |Code  |
+------+------+------+------+----------------+--------+------+
|6000.0|6110.0|5660.0|6110.0|4761.18310546875|74195000|005930|
|5800.0|6060.0|5520.0|5580.0|4348.18408203125|74680000|005930|
|5750.0|5780.0|5580.0|5620.0|4379.35400390625|54390000|005930|
|5560.0|5670.0|5360.0|5540.0|4317.015625     |40305000|005930|
|5600.0|5770.0|5580.0|5770.0|4496.24072265625|46880000|005930|
|5820.0|6100.0|5770.0|5770.0|4496.24072265625|59745000|005930|
|5610.0|5740.0|5600.0|5720.0|4457.2783203125 |29220000|005930|
|5600.0|5740.0|5560.0|5710.0|4449.48583984375|41190000|005930|
|5720.0|5880.0|5680.0|5830.0|4542.99560546875|49375000|005930|
|6000.0|6180.0|5920.0|6100.0|4753.39111328125|63505000|005930|
|6160.0|6160.0|5980.0|6100.0|4753.39111328125|45260000|005930|
|6000.0|6040.0|5960.0|5960.0|4644.29736328125|40205000|005930|
|5860.0|6040.0|5820.0|6040.0|4706.63916015625|37315000|

## pyspark 사용방법
- pip install pyspark 설치 이후 
  stored in directory 위치 확인 
  
/root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f

- spark home 지정

PYSPARK_PYTHON=python3 SPARK_HOME=~/root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f

In [3]:
# 기본 설정 

!pip install pyspark
!pip install yfinance

#!PYSPARK_PYTHON=python3 SPARK_HOME=~/root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f

from google.colab import drive
drive.mount('/content/drive')
!ls /content/drive/MyDrive/data-warehouse/stock_day

Collecting yfinance
  Downloading https://files.pythonhosted.org/packages/a7/ee/315752b9ef281ba83c62aa7ec2e2074f85223da6e7e74efb4d3e11c0f510/yfinance-0.1.59.tar.gz
Collecting lxml>=4.5.1
[?25l  Downloading https://files.pythonhosted.org/packages/cf/4d/6537313bf58fe22b508f08cf3eb86b29b6f9edf68e00454224539421073b/lxml-4.6.3-cp37-cp37m-manylinux1_x86_64.whl (5.5MB)
[K     |████████████████████████████████| 5.5MB 7.5MB/s 
Building wheels for collected packages: yfinance
  Building wheel for yfinance (setup.py) ... [?25l[?25hdone
  Created wheel for yfinance: filename=yfinance-0.1.59-py2.py3-none-any.whl size=23442 sha256=843d9d63e68c33a2445e47e1a6d651577666d7988459829ea74cd40b87262c31
  Stored in directory: /root/.cache/pip/wheels/f8/2a/0f/4b5a86e1d52e451757eb6bc17fd899629f0925c777741b6d04
Successfully built yfinance
Installing collected packages: lxml, yfinance
  Found existing installation: lxml 4.2.6
    Uninstalling lxml-4.2.6:
      Successfully uninstalled lxml-4.2.6
Successfully

In [6]:
!ls /content/drive/MyDrive/data-warehouse/

part-00000-2b948eaf-6101-4dc5-a7c5-a7ffc8845fe0-c000.snappy.parquet
part-00000-733393fb-9bcc-4464-8a40-983d0e799833-c000.snappy.parquet
part-00000-a1204b83-0571-468c-807d-50ac96b0c149-c000.snappy.parquet
part-00000-b92531e8-407a-42a2-b238-84fdbcb557e0-c000.snappy.parquet
part-00000-dc247411-6349-4250-91b0-6750eee87076-c000.snappy.parquet
part-00001-2b948eaf-6101-4dc5-a7c5-a7ffc8845fe0-c000.snappy.parquet
part-00001-733393fb-9bcc-4464-8a40-983d0e799833-c000.snappy.parquet
part-00001-a1204b83-0571-468c-807d-50ac96b0c149-c000.snappy.parquet
part-00001-b92531e8-407a-42a2-b238-84fdbcb557e0-c000.snappy.parquet
part-00001-dc247411-6349-4250-91b0-6750eee87076-c000.snappy.parquet
stocklab.db
_SUCCESS
