# Ascend PySpark Transform Notebook

This notebook allows users to test PySpark transform code with data in the corresponding upstream component in Ascend. This notebook has the following 2 dependencies:

1. Install Ascend [Python SDK](https://github.com/ascend-io/sdk-python)
```
pip3 install git+https://github.com/ascend-io/sdk-python.git
```

2. Install other Python libraries
```
pip3 install pyspark
pip3 install pandas
pip3 install pyarrow
```

There are a total of 5 key paragraphs in this Notebook and we would go through exactly what they do below.

# Paragraph 1: Create Ascend Client

This paragraph initializes the Ascend client with the Ascend SDK. 

In [20]:
from ascend.client import Client
from ascend.model import Component, Dataflow, DataService
import configparser
import pandas as pd
import os

profile = "trial"
host = "trial.ascend.io"

config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.ascend/credentials"))

access_id = config.get(profile, "ascend_access_key_id")
secret_key = config.get(profile, "ascend_secret_access_key")

A = Client(host, access_id, secret_key)

# dsList = [str(elem) for elem in A.list_data_services()]
# print('\n','--List of Data Services in environment--','\n')
# print(*dsList,sep='\n')

# DATA_SERVICE = 'Getting_Started_with_Ascend'
# dfList = [str(elem) for elem in A.list_data_feeds(DATA_SERVICE)]
# print('\n','--List of Data Feeds in environment--','\n')
# print(*dfList,sep='\n')


# Paragraph 2: Select the Upstream Component

This paragraph selects the upstream component in Ascend that would serve as the input to the PySpark transform. We are also previewing the first 10 rows of this component so that we know what dataset we are working with.

In [21]:
df = A.get_data_service('Getting_Started_with_Ascend').get_dataflow('Advanced_Tutorial').get_component('_clean_Weather')
input_df = pd.DataFrame.from_records(df.get_records(0,10))
input_df

Unnamed: 0,Date,Weather,rainfall,weather_date_ts
0,2012-01-01,Cloudy,0.9,2012-01-01T00:00:00Z
1,2012-01-02,Sunny,0.0,2012-01-02T00:00:00Z
2,2012-01-03,Rainy,0.879734,2012-01-03T00:00:00Z
3,2012-01-04,Pouring,1.698499,2012-01-04T00:00:00Z
4,2012-01-05,Cloudy,0.0,2012-01-05T00:00:00Z
5,2012-01-06,Sunny,0.0,2012-01-06T00:00:00Z
6,2012-01-07,Rainy,0.731318,2012-01-07T00:00:00Z
7,2012-01-08,Pouring,1.547524,2012-01-08T00:00:00Z
8,2012-01-09,Cloudy,0.0,2012-01-09T00:00:00Z
9,2012-01-10,Sunny,0.0,2012-01-10T00:00:00Z


# Paragraph 3: Create Spark Session & Schema

This paragraph creates the Spark session with the schema from upstream component above. 

In [22]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.context import SparkContext 
from pyspark.sql.types import *

inputs = []

myDfSchema = StructType([StructField('Date',StringType(), True),
                            StructField('Weather',StringType(), True),
                            StructField('rainfall',DoubleType(), True),
                            StructField('weather_date_ts',StringType(), True)])
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
inputs.append(spark.createDataFrame(data=input_df,schema=myDfSchema))

# Paragraph 4: PySpark Transform Code

This paragraph contains exactly the pyspark code user can put in an Ascend PySpark transform once the test passes in the Notebook. 

If you already have an active trial with Asecnd, you can see this Ascend PySpark transform here https://trial.ascend.io/ui/v2/organization/Getting_Started_with_Ascend/project/Advanced_Tutorial/view/PySpark_Example. 

If you don't yet have an active trial with Ascend, you can request one here https://www.ascend.io/trial.

In [23]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, avg
from typing import List

def transform(spark_session: SparkSession, inputs: List[DataFrame], credentials=None) -> DataFrame:
    """Transforms input DataFrame(s) and returns a single DataFrame as a result

    # Arguments
    spark_session -- Entrypoint into PySpark's Dataset and DataFrame API
    inputs        -- A nonempty List of the input components for this Transform. The index of each
                     input in the list is determined by how the Transform is configured.
    credentials   -- If set (in Advanced Settings), this variable takes upon the string value of the
                     content of the 'Credentials Secret' field.

    # Returns
    Any object of type DataFrame
    """
    _clean_Weather = inputs[0]
    _clean_Weather.createOrReplaceTempView("_clean_Weather")
    # return spark_session.sql("""SELECT * FROM _clean_Weather""")
    return _clean_Weather.withColumn("ten_times_rainfall", col("rainfall")*10)


# Paragraph 5: Testing PySpark Code

This paragraph runs the pyspark code in the previous paragraph and outputs the top 10 rows.

In [19]:
from pyspark.sql import DataFrame, SparkSession
output = transform(spark,inputs)
output.limit(10).toPandas().head(10)

Unnamed: 0,Date,Weather,rainfall,weather_date_ts,ten_times_rainfall
0,2012-01-01,Cloudy,0.9,2012-01-01T00:00:00Z,9.0
1,2012-01-02,Sunny,0.0,2012-01-02T00:00:00Z,0.0
2,2012-01-03,Rainy,0.879734,2012-01-03T00:00:00Z,8.797345
3,2012-01-04,Pouring,1.698499,2012-01-04T00:00:00Z,16.984994
4,2012-01-05,Cloudy,0.0,2012-01-05T00:00:00Z,0.0
5,2012-01-06,Sunny,0.0,2012-01-06T00:00:00Z,0.0
6,2012-01-07,Rainy,0.731318,2012-01-07T00:00:00Z,7.313179
7,2012-01-08,Pouring,1.547524,2012-01-08T00:00:00Z,15.475237
8,2012-01-09,Cloudy,0.0,2012-01-09T00:00:00Z,0.0
9,2012-01-10,Sunny,0.0,2012-01-10T00:00:00Z,0.0
