In [8]:
import findspark
findspark.init()
findspark.find()

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.window as w
import pyspark.sql.types as t

In [2]:
spark = SparkSession.builder.appName('mycourse').getOrCreate()

In [3]:
#define a custom schema
custome_schema = t.StructType([
    t.StructField('date_time',   t.StringType(), True),
    t.StructField('userid',      t.StringType(), True),
    t.StructField('domain',      t.StringType(), True),
    t.StructField('dlbytes',     t.IntegerType(), True),
    t.StructField('ulbytes',     t.IntegerType(), True),
    t.StructField('clientip',    t.StringType(), True),
    t.StructField('serverip',    t.StringType(), True),
    t.StructField('country',     t.StringType(), True),
    t.StructField('txn_time',    t.FloatType(), True),
    t.StructField('http_method', t.StringType(), True),
    t.StructField('user_agent',  t.StringType(), True),
    t.StructField('platform',    t.StringType(), True)
])

# load the data with the custom schema
data_path = r'C:\Users\alex\Desktop\PySpark Crash Course Learn Spark Quickly\1 - Introduction\2 - course-file\course_file.csv'
df = spark.read.csv(data_path, schema=custome_schema, header=True)

# show the loaded Datafarme
df.show(5)

+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|     serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|68.171.236.18|Scotland|    1.32|       HTTP|Mozilla/5.0 (Wind...|   Linux|
|2023-10-03 09:43:11|9

In [10]:
# define a python function
def calculate_total_bytes(dlbytes, ulbytes):
    return dlbytes+ulbytes

# the function above is a spark udf
calculate_total_bytes_udf = f.udf(calculate_total_bytes, t.IntegerType())

# we use the udf to create a new column
df.withColumn('total_bytes', calculate_total_bytes_udf(df['dlbytes'], df['ulbytes'])).show(5)

+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+-----------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|     serverip| country|txn_time|http_method|          user_agent|platform|total_bytes|
+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+-----------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|    1614333|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|     580402|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|68.171.236.18|Scotland|    1.32|     

In [9]:
def custom_text_processing(hhtp_method):
    return hhtp_method.lower()

custom_text_processing_udf = f.udf(custom_text_processing, t.StringType())

df.withColumn('processed_http', custom_text_processing_udf(df['http_method'])).show(5)

+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+--------------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|     serverip| country|txn_time|http_method|          user_agent|platform|processed_http|
+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+--------------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|         https|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|          http|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|68.171.236.18|Scotland

In [12]:
def txn_time_milliseconds(txn_time):
    return txn_time*1000

txn_time_milliseconds_udf = f.udf(txn_time_milliseconds, t.FloatType())

df.withColumn('txn_time_milliseconds', txn_time_milliseconds_udf(df['txn_time'])).show(5)

+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+---------------------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|     serverip| country|txn_time|http_method|          user_agent|platform|txn_time_milliseconds|
+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+---------------------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|               2330.0|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|               1200.0|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  1