In [None]:
print('hi')

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType
import pydeequ

# spark session
spark = SparkSession.builder\
    .master("spark://spark-master:7077") \
    .appName("finance_data_etl")\
    .getOrCreate()


In [None]:
from pyspark.sql.types import DoubleType

df_stock_AAPL = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferschema', 'true')\
    .load('input/stock/AAPL_data.csv')\
    .drop('_c0')
    


df_stock_AAPL.show(3)

df_stock_AAPL.write.format("jdbc").options(
    url="jdbc:mysql://db-mysql:3306/strading",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="stock_AAPL",
    user="root",
    password="1234"
).mode("overwrite").save()

In [None]:
# get data
# budget data
budget_schema = StructType([
    StructField('budget_id', IntegerType(), nullable=False),
    StructField('budget', IntegerType(), nullable=False)]
)


df_budget = spark.read.format('csv')\
    .option('header', 'true')\
    .schema(budget_schema) \
    .load('input/budget.csv') \
    .withColumnRenamed("계좌id", "budget_id") \
    .withColumnRenamed("예산", "budget")

df_budget.show(5)

In [None]:
df_budget.printSchema()

In [None]:
# save data

df_budget.write.format("jdbc").options(
    url="jdbc:mysql://db-mysql:3306/strading",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="budget",
    user="root",
    password="1234"
).mode("overwrite").save()

In [7]:
# customer data
customer_schema = StructType([
    StructField('customer_id', IntegerType(), nullable=False),
    StructField('name', StringType(), nullable=False),
    StructField('sex', StringType(), nullable=True),
    StructField('age', IntegerType(), nullable=True),
    StructField('budget_id', IntegerType(), nullable=True),]
)

df_customer = spark.read.format('csv')\
    .option('header', 'true')\
    .schema(customer_schema) \
    .load('input/customer.csv')

In [None]:
df_customer.write.format("jdbc").options(
    url="jdbc:mysql://db-mysql:3306/strading",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="customer",
    user="root",
    password="1234"
).mode("overwrite").save()

In [None]:
import yfinance as yf
import pandas as pd

# 대표 IT 기업의 티커 목록
companies = {
    "Apple": "AAPL",
    "Microsoft": "MSFT",
    "Google": "GOOGL",
    "Amazon": "AMZN",
    "Meta": "META"
}

# 각 기업의 주식 정보를 저장할 데이터프레임 리스트
stock_data = []

# 각 기업의 주식 정보를 가져오기
for company, ticker in companies.items():
    print(f"Fetching data for {company} ({ticker})...")
    stock = yf.Ticker(ticker)
    
    # 최근 1개월간의 주식 정보 가져오기
    data = stock.history(period="1mo")
    data["Company"] = company  # 회사명 추가
    stock_data.append(data)


print(stock_data)

In [None]:
from pyspark.sql import SparkSession
import pydeequ
spark = SparkSession.builder\
    .master("spark://spark-master:7077") \
    .appName("finance_data_etl")\
    .config("spark.jars.packages",
            "com.amazon.deequ:deequ:2.0.7-spark-3.5,"
            "com.google.cloud.spark:spark-3.5-bigquery:0.41.1,"
            "mysql:mysql-connector-java:8.0.33") \
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
    .config('spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS', 'myjars/spark2big-323e6547b0e3.json') \
    .config('spark.driverEnv.GOOGLE_APPLICATION_CREDENTIALS', 'myjars/spark2big-323e6547b0e3.json') \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

In [None]:
df = spark.read.format('csv')\
    .option('inferSchema', 'true')\
    .option('header', 'true')\
    .load('input/Finance_data.csv')

# df.show()


In [None]:
print(df.count())
print(df.printSchema())

In [None]:
df.describe().show()

## DB 저장

In [5]:
df.write.format("jdbc").options(
    url="jdbc:mysql://db-mysql:3306/mydb",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="mytable",
    user="root",
    password="1234"
).mode("append").save()

##

## DB Load

In [None]:
df2 = spark.read.format('jdbc').options(
    url="jdbc:mysql://db-mysql:3306/mydb",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="mytable",
    user="root",
    password="1234"
).load()

df2.show()

## 데이터 정합성, 퀄리티

In [None]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("age")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

In [None]:
from pydeequ.profiles import *

result = ColumnProfilerRunner(spark) \
    .onData(df) \
    .run()

for col, profile in result.profiles.items():
    print(profile)

### Constraint Suggestions

In [None]:
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(suggestionResult)

# 빅쿼리

In [7]:
spark.conf.set("parentProject", "spark2big") # 프로젝트명 명시
spark.conf.set("credentialsFile", "myjars/spark2big-323e6547b0e3.json")

df = spark.read \
  .format("bigquery") \
  .load("bigquery-public-data.samples.shakespeare")

In [None]:
df.count()

In [None]:
df.show()

In [11]:
df = spark.createDataFrame([
    (1, 'abc', 24),
    (2, 'def', 14),
    (3, 'ghu', 4),
])

In [None]:
df.show()

In [None]:

spark.conf.set("parentProject", "spark2big")
spark.conf.set("credentialsFile", "myjars/spark2big-323e6547b0e3.json")

df.write.format("bigquery")\
    .option("writeMethod", "direct") \
    .save("mydataset.mytablename")

In [None]:
spark.read.format("bigquery")\
    .option("table", ">").load()

In [None]:
print('hi')

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType
from pydeequ.profiles import *


from pyspark.sql import SparkSession
from pydeequ.verification import VerificationSuite, VerificationResult
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes


# spark session
spark = SparkSession.builder\
    .master("spark://spark-master:7077") \
    .appName("scratch")\
    .config('spark.jars.packages', 'com.amazon.deequ:deequ:2.0.7-spark-3.5')\
    .config('spark.jars', 'myjars/packages_jars/mysql-connector-j-8.0.33.jar,myjars/packages_jars/spark-bigquery-with-dependencies_2.12-0.41.1.jar')\
    .config("spark.executor.memory", "4g")\
    .config("spark.driver.memory", "4g")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
    .getOrCreate()


table_name = 'budget'

df = spark.read.format('jdbc').options(
    url="jdbc:mysql://db-mysql:3306/strading",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable=table_name,
    user="root",
    password="1234"
).load()



df.show(5)



In [None]:
# 데이터 품질 체크
check = Check(spark, CheckLevel.Warning, "Comprehensive Data Quality Check")
check = (
    check.isComplete("budget_id", "No nulls in budget_id")
    .isComplete("budget", "No nulls in budget")
    .isUnique("budget_id", "Unique budget_id")
    .isNonNegative("budget", "Non-negative budget")
    .hasDataType("budget_id", ConstrainableDataTypes.Integral, "budget_id is an integer")
    .hasDataType("budget", ConstrainableDataTypes.Integral, "budget is an integer")
    .satisfies("budget < 100000000", "Valid budget range")
)

# 품질 검증 실행
result = VerificationSuite(spark).onData(df).addCheck(check).run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, result)
# checkResult_df.show()

In [None]:
checkResult_df.select('constraint', 'constraint_message').show(truncate=False)

In [None]:
print('hi')

In [None]:
# SparkSession 종료
spark.sparkContext._gateway.shutdown_callback_server()
spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pydeequ.checks import Check
from pydeequ.verification import VerificationSuite
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes
from pydeequ.verification import VerificationSuite, VerificationResult
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes

spark = SparkSession.builder \
    .appName("deequ_test") \
    .config('spark.jars.packages', 'com.amazon.deequ:deequ:2.0.7-spark-3.5') \
    .getOrCreate()

data = [
    {"budget_id": 1, "budget": 100},
    {"budget_id": 2, "budget": None},
    {"budget_id": 3, "budget": 200},
]

df = spark.createDataFrame(data)

check = Check(spark, CheckLevel.Warning, "Comprehensive Data Quality Check")
check = (
    check.isComplete("budget_id", "No nulls in budget_id")
    .isComplete("budget", "No nulls in budget")
    .isUnique("budget_id", "Unique budget_id")
    .satisfies("budget < 100000000", "Valid budget range")
    .isNonNegative("budget", "Non-negative budget")
)



# 품질 검증 실행
result = VerificationSuite(spark).onData(df).addCheck(check).run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, result)
checkResult_df.show()


In [None]:
pydeequ.f2j_maven_coord

In [None]:
from pyspark.sql import SparkSession, Row
import pydeequ

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()

In [None]:
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes
check = Check(spark, CheckLevel.Warning, "Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b") \
        .hasDataType("c", ConstrainableDataTypes.Integral, hint="c is an integer")) \
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.select('constraint_message').show(truncate=False)

# Debug db to dw final

In [None]:
print('connection check')

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType
from pydeequ.profiles import *


from pyspark.sql import SparkSession
from pydeequ.verification import VerificationSuite, VerificationResult
from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes


# spark session
spark = SparkSession.builder\
    .master("spark://spark-master:7077") \
    .appName("scratch")\
    .config('spark.jars.packages', 'com.amazon.deequ:deequ:2.0.7-spark-3.5')\
    .config('spark.jars', 'myjars/packages_jars/mysql-connector-j-8.0.33.jar,myjars/packages_jars/spark-bigquery-with-dependencies_2.12-0.41.1.jar')\
    .config("spark.executor.memory", "4g")\
    .config("spark.driver.memory", "4g")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
    .getOrCreate()


:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /opt/bitnami/spark/.ivy2/cache
The jars for the packages stored in: /opt/bitnami/spark/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cac802af-de37-457d-b50c-4c3cacd7d857;1.0
	confs: [default]
	found com.amazon.deequ#deequ;2.0.7-spark-3.5 in central
	found org.scala-lang#scala-reflect;2.12.10 in central
	found org.scalanlp#breeze_2.12;2.1.0 in central
	found org.scalanlp#breeze-macros_2.12;2.1.0 in central
	found org.typelevel#spire_2.12;0.17.0 in central
	found org.typelevel#spire-macros_2.12;0.17.0 in central
	found org.typelevel#algebra_2.12;2.0.1 in central
	found org.typelevel#cats-kernel_2.12;2.1.1 in central
	found org.typelevel#spire-platform_2.12;0.17.0 in central
	found org.typelevel#spire-util_2.12;0.17.0 in central
	found dev.ludovic.netlib#blas;3.0.1 in central
	found net.sourceforge.f2j#arpack_combined_all;0.1 in central
	found dev.ludovic.netlib#lapack;3.0.1 in central
	f

In [4]:
# spark session
from pyspark.sql import SparkSession
import sys
import pydeequ

spark.conf.set("parentProject", "spark2big")
spark.conf.set("credentialsFile", "myjars/spark2big-992917168560.json")


df_list = ['stocks']

for df_name in df_list:
    # load data from mysql
    df = spark.read.format('jdbc').options(
        url="jdbc:mysql://db-mysql:3306/strading",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable=df_name,
        user="root",
        password="1234"
    ).load()

    # save data to bigquery
    df.write.format("bigquery")\
        .option("writeMethod", "direct") \
        .mode('overwrite') \
        .save("mydataset." + df_name)


                                                                                