In [0]:
"""
    This script is used to load data to sql server database.
"""

Out[1]: '\n    This script is used to load data to sql server database.\n'

In [0]:
%pip install Faker

Python interpreter will be restarted.
Collecting Faker
  Downloading Faker-16.4.0-py3-none-any.whl (1.7 MB)
Installing collected packages: Faker
Successfully installed Faker-16.4.0
Python interpreter will be restarted.


In [0]:
from faker import Faker
from pyspark.sql import SparkSession, functions, DataFrame
from pyspark.sql.types import StructField, StructType, DecimalType
from typing import List

In [0]:
spark = SparkSession.builder.           \
  appName("ExamplePySparkSubmitTask").  \
  config("spark.databricks.hive.metastore.glueCatalog.enabled", "true"). \
  enableHiveSupport(). \
  getOrCreate()

In [0]:
faker = Faker()

In [0]:
def create_sample_profiles(
    _faker: Faker()
):
    """
        This function is to create sample profiles
    """
    profiles = [_faker.profile() for _ in range(1000)]
    return profiles

In [0]:
def read_profiles_as_dataframe(
    _spark: SparkSession,
    _profiles: List[dict]
):
    """
        This function is to read profiles as dataframe
    """
    df = _spark \
        .createDataFrame(_profiles)
    return df


In [0]:
def process_data(
    _df: DataFrame,
    _count: int
):
    """
        This function is to process data with requirements
    """
    processed_df = _df \
        .coalesce(1) \
        .withColumn('id', (1000 * _count) + (functions.monotonically_increasing_id() +1) ) \
        .withColumn('age', functions.round(functions.rand() * 130)) \
        .withColumn('latitude', functions.col('current_location._1')) \
        .withColumn('longitude', functions.col('current_location._2')) \
        .withColumn('website', functions.concat_ws(',', 'website')) \
        .drop('current_location')
    return processed_df

In [0]:
def write_dataframe_to_sql_server(
    _df: DataFrame
):
    """
        This function is to write dataframe to sql server
    """
    jdbc_options = {
        "url": 'jdbc:sqlserver://ec2-52-221-236-170.ap-southeast-1.compute.amazonaws.com;databaseName=test',
#         "url": 'jdbc:sqlserver://hengstyle-survey-dms-source-with-sql-server.coqzqp01jm6a.ap-southeast-1.rds.amazonaws.com;databaseName=dev',
        "user": 'AWS_SYNC',
        "password": '$rew346TSF*!',
        "dbtable": 'profiles'
    }
    _df \
        .write \
        .format('jdbc') \
        .options(**jdbc_options) \
        .mode('append') \
        .save()

In [0]:
def write_dataframe_to_s3(
    _df: DataFrame
):
    """
        This function is to write dataframe to s3
    """
    s3_target_path = 's3://hengstyle-survey-for-staging-data/profiles_glue'
    database_name = 'hengstyle_survey'
    table_name = 'profiles_glue'
    _df \
        .write \
        .format('parquet') \
        .option("path", s3_target_path) \
        .mode('append') \
        .saveAsTable(f'{database_name}.{table_name}')

In [0]:
def write_dataframe_to_redshift(
    _df: DataFrame
):
    jdbc_options = {
        "url": 'jdbc:redshift://hengstyle-demo-data-platform-redshift-cluster.cphjiazy7e1z.ap-southeast-1.redshift.amazonaws.com:5439/dev',
        "user": 'awsuser',
        "password": '65VUFKe&3q2bKfrb',
        "dbtable": 'profiles_for_redshift',
        "aws_iam_role": "arn:aws:iam::375572388687:role/hengstyle_demo_redshift_service_role",
        "tempdir": "s3://hengstyle-demo-for-databricks/redshift/"
    }
    _df \
        .drop('address', 'latitude', 'longitude') \
        .write \
        .format('com.databricks.spark.redshift') \
        .options(**jdbc_options) \
        .mode('append') \
        .save()

In [0]:
def main(
    _count: int
):
    """
        This is main function
    """
    try:
        profiles = create_sample_profiles(
            _faker=faker
        )
        df = read_profiles_as_dataframe(
            _spark=spark,
            _profiles=profiles
        )
        processed_df = process_data(
            _df=df,
            _count=_count
        )
#         display(processed_df.head(5))
        write_dataframe_to_sql_server(
            _df=processed_df
        )
#         write_dataframe_to_s3(
#             _df=processed_df
#         )
#         write_dataframe_to_redshift(
#             _df=processed_df
#         )
      
    except:
        raise

#    return processed_df

In [0]:
if __name__ == '__main__':
    main(0)

In [0]:
display(main(0).count())

1000

In [0]:
# if __name__ == '__main__':
#     main(0)

In [0]:
import time
count = 36
while(count < 41):
    print(count)
    main(
        _count=count
    )
    count += 1

36
37
38
39
40


In [0]:
# %sql
# select * from hengstyle_demo.profiles where id = 1

