PySpark Crash Course


In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("mycourse").getOrCreate()

In [3]:
df = spark.read.csv('./course_file.csv', header = True)

In [4]:
df.show(2)

+-------------------+-------------+----------------+-------+-------+---------------+-------------+-------+--------+-----------+--------------------+--------+
|          date_time|       userid|          domain|dlbytes|ulbytes|       clientip|     serverip|country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+----------------+-------+-------+---------------+-------------+-------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|ryan-wells.co.uk| 872807| 741526|142.132.219.110|62.42.184.180|England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|     hopkins.org|  50898| 529504|  184.205.48.78|152.123.41.39|  Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
+-------------------+-------------+----------------+-------+-------+---------------+-------------+-------+--------+-----------+--------------------+--------+
only showing top 2 rows



In [5]:
df.printSchema()

root
 |-- date_time: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- dlbytes: string (nullable = true)
 |-- ulbytes: string (nullable = true)
 |-- clientip: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- country: string (nullable = true)
 |-- txn_time: string (nullable = true)
 |-- http_method: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- platform: string (nullable = true)



In [6]:
df.describe().show()

+-------+-------------------+--------------------+-----------------+------------------+-----------------+-----------+------------+-------+------------------+-----------+--------------------+--------+
|summary|          date_time|              userid|           domain|           dlbytes|          ulbytes|   clientip|    serverip|country|          txn_time|http_method|          user_agent|platform|
+-------+-------------------+--------------------+-----------------+------------------+-----------------+-----------+------------+-------+------------------+-----------+--------------------+--------+
|  count|             100000|              100000|           100000|            100000|           100000|     100000|      100000| 100000|            100000|     100000|              100000|  100000|
|   mean|               NULL|5.010135665120539E12|             NULL|      499993.58573|     499925.16314|       NULL|        NULL|   NULL|1.4980519999999975|       NULL|                NULL|    NULL|


In [7]:
df.count()

100000

In [8]:
df.columns

['date_time',
 'userid',
 'domain',
 'dlbytes',
 'ulbytes',
 'clientip',
 'serverip',
 'country',
 'txn_time',
 'http_method',
 'user_agent',
 'platform']

In [9]:
df.dtypes

[('date_time', 'string'),
 ('userid', 'string'),
 ('domain', 'string'),
 ('dlbytes', 'string'),
 ('ulbytes', 'string'),
 ('clientip', 'string'),
 ('serverip', 'string'),
 ('country', 'string'),
 ('txn_time', 'string'),
 ('http_method', 'string'),
 ('user_agent', 'string'),
 ('platform', 'string')]

In [10]:
countries = df.select('country','dlbytes')

In [11]:
countries.show(2)

+-------+-------+
|country|dlbytes|
+-------+-------+
|England| 872807|
|  Wales|  50898|
+-------+-------+
only showing top 2 rows



In [13]:
from pyspark.sql.types import *
#Define custom schema
custom_schema = StructType([
    StructField("date_time", StringType(), True),
    StructField("userid", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("dlbytes", IntegerType(), True),
    StructField("ulbytes", IntegerType(), True),
    StructField("clientip", StringType(), True),
    StructField("serverip", StringType(), True),
    StructField("country", StringType(), True),
    StructField("txn_time", FloatType(), True),
    StructField("http_method", StringType(), True),
    StructField("user_agent", StringType(), True),
    StructField("platform", StringType(), True)
])

#Load the data with the custom schema
data_path = "./course_file.csv"
df = spark.read.csv(data_path, schema = custom_schema, header = True)

#Show the loaded DataFrame
df.show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|  68.171.236.18|Scotland|    1.32|       HTTP|Mozilla/5.0 (Wind...|   Linux|
|2023-10-0

In [14]:
df.printSchema()

root
 |-- date_time: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- dlbytes: integer (nullable = true)
 |-- ulbytes: integer (nullable = true)
 |-- clientip: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- country: string (nullable = true)
 |-- txn_time: float (nullable = true)
 |-- http_method: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- platform: string (nullable = true)



In [15]:
df.fillna(
    {
        'dlbytes' : 0,
        'ulbytes' : 0,
        'country' : 'Unknown'
    }
).show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|  68.171.236.18|Scotland|    1.32|       HTTP|Mozilla/5.0 (Wind...|   Linux|
|2023-10-0

In [16]:
df.fillna('1').show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|  68.171.236.18|Scotland|    1.32|       HTTP|Mozilla/5.0 (Wind...|   Linux|
|2023-10-0

In [17]:
df.dropna(subset = ['country']).show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|  68.171.236.18|Scotland|    1.32|       HTTP|Mozilla/5.0 (Wind...|   Linux|
|2023-10-0

In [18]:
from pyspark.sql import functions as F

In [19]:
average = df.agg(F.avg('dlbytes')).collect()[0][0]

In [20]:
print(average)

499993.58573


In [21]:
df.fillna(
    {
        'dlbytes' : average,
        'ulbytes' : 0,
        'country' : 'Unknown'
    }
).show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|  68.171.236.18|Scotland|    1.32|       HTTP|Mozilla/5.0 (Wind...|   Linux|
|2023-10-0

In [22]:
df.show(2)

+-------------------+-------------+----------------+-------+-------+---------------+-------------+-------+--------+-----------+--------------------+--------+
|          date_time|       userid|          domain|dlbytes|ulbytes|       clientip|     serverip|country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+----------------+-------+-------+---------------+-------------+-------+--------+-----------+--------------------+--------+
|2023-10-04 11:37:11|7773153683656|ryan-wells.co.uk| 872807| 741526|142.132.219.110|62.42.184.180|England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|
|2023-10-04 12:17:07|1886351675683|     hopkins.org|  50898| 529504|  184.205.48.78|152.123.41.39|  Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|
+-------------------+-------------+----------------+-------+-------+---------------+-------------+-------+--------+-----------+--------------------+--------+
only showing top 2 rows



In [23]:
df.createOrReplaceTempView("dfsql")

In [24]:
spark.sql("select country, count(*) from dfsql where dlbytes > 50000 group by country ").show()

+--------+--------+
| country|count(1)|
+--------+--------+
|   Wales|   23802|
| Ireland|   23718|
| England|   23878|
|Scotland|   23607|
+--------+--------+



In [25]:
grouped_df_1 = df.groupBy('country').agg(
    {
        'dlbytes': 'sum',
        'ulbytes': 'avg'
    }
)

grouped_df_1.withColumnRenamed('avg(ulbytes)','AvgUL').show()

+--------+------------------+------------+
| country|             AvgUL|sum(dlbytes)|
+--------+------------------+------------+
|   Wales| 501170.9036600422| 12506206681|
| Ireland|498588.33693996957| 12496130770|
| England| 500240.4421728041| 12629527260|
|Scotland|499690.46323203866| 12367493862|
+--------+------------------+------------+



In [26]:
from pyspark.sql.functions import *

df.groupBy('country', 'platform').agg(
    sum('dlbytes').alias('DL'),
    avg('ulbytes').alias('UL'),
    count('*').alias('row count')
).show()

+--------+--------+----------+------------------+---------+
| country|platform|        DL|                UL|row count|
+--------+--------+----------+------------------+---------+
| England| Windows|2519261611|496956.96056562435|     5021|
|   Wales| Android|2486373821| 499227.4556886228|     5010|
| Ireland| Android|2462635163| 501362.6873722926|     4894|
| Ireland|     Mac|2507337591| 496117.7526796348|     5038|
|Scotland| Windows|2492168780|498358.41915447806|     4991|
|Scotland|   Linux|2486170038| 502737.1797207043|     4941|
| Ireland| Windows|2486340036| 495752.9689328223|     4957|
| England|     Mac|2561386336| 500483.1044216344|     5066|
|   Wales|     iOS|2503588431|499370.84761152783|     5066|
| England|   Linux|2498926814| 497278.5799840828|     5026|
|Scotland|     iOS|2447852329| 496049.2466639709|     4946|
|   Wales|   Linux|2512986377|497722.29527794383|     5019|
|   Wales| Windows|2517792294|  507253.420969023|     5036|
|Scotland| Android|2487105964|500750.421

In [27]:
df.withColumn('totalbytes', col('dlbytes') + col('ulbytes')).show(5)

+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+----------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|     serverip| country|txn_time|http_method|          user_agent|platform|totalbytes|
+-------------------+-------------+--------------------+-------+-------+---------------+-------------+--------+--------+-----------+--------------------+--------+----------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|   1614333|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|    580402|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|68.171.236.18|Scotland|    1.32|       HTT

In [28]:
df.withColumn('year', year('date_time')).show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+----+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|year|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+----+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|2023|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|2023|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|  68.171.236.18|Scotland|    1.32|       HTTP|Mozilla/5.0 (Win

In [29]:
df.withColumn('txntime_milliseconds', col('txn_time') * 1000).show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+--------------------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|txntime_milliseconds|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+--------------------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|              2330.0|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|              1200.0|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 95242

In [30]:
df.withColumn('device_type',
              when(
                  col('platform').isin('ios', 'Android'), \
                  'Mobile') \
              .otherwise('desktop')).show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+-----------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|device_type|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+-----------+
|2023-10-04 11:37:11|7773153683656|    ryan-wells.co.uk| 872807| 741526|142.132.219.110|  62.42.184.180| England|    2.33|      HTTPS|Mozilla/5.0 (Wind...| Android|     Mobile|
|2023-10-04 12:17:07|1886351675683|         hopkins.org|  50898| 529504|  184.205.48.78|  152.123.41.39|   Wales|     1.2|       HTTP|Mozilla/5.0 (Wind...| Android|     Mobile|
|2023-10-02 23:25:12|1597721345356|           evans.com| 964276| 952420|  189.30.60.163|  68.171.236.18|Scotland|  

In [31]:
df = df.dropDuplicates(['userid'])

In [32]:
df.groupBy('userid').count().show()

+-------------+-----+
|       userid|count|
+-------------+-----+
|9476243665412|    1|
|7435163662288|    1|
|6078186529360|    1|
|0404364283997|    1|
|9712471062565|    1|
|5518019884193|    1|
|0154603546029|    1|
|0975095706028|    1|
|0092213970495|    1|
|8292463551450|    1|
|9193610451990|    1|
|8470101955002|    1|
|4096254500198|    1|
|7089895133125|    1|
|0133730261390|    1|
|8300394808680|    1|
|1409348237572|    1|
|8781094037166|    1|
|1837572565487|    1|
|9743743756974|    1|
+-------------+-----+
only showing top 20 rows



In [None]:
df.write.csv('mycsvfile')

In [None]:
df = df.coalesce(1) #only one file

In [None]:
df.write.csv('mycsvfile2')

In [None]:
df.write.json('myjsonfile')

In [36]:
from pyspark.sql.functions import when

In [37]:
df.withColumn("caseexample", when(df["dlbytes"] > 500000, "large").otherwise("small")).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|caseexample|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|      large|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|      large|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Scotland|    0.31|     

In [38]:
from pyspark.sql.functions import isnull

In [39]:
df.withColumn("nullexample", when(isnull(df["user_agent"]),1).otherwise(0)).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|nullexample|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|          0|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|          0|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Scotland|    0.31|     

In [40]:
df.withColumn("transactionsize",

              when((df["dlbytes"] > 500000) & (df["ulbytes"] > 500000), "large")  \

              .when((df["dlbytes"] > 250000) & (df["ulbytes"] > 250000), "medium") \

              .when((df["dlbytes"] > 150000) & (df["ulbytes"] > 150000), "small")

    .otherwise("tiny")).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+---------------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|transactionsize|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+---------------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|          small|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|           tiny|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Sco

In [41]:
df.withColumn("solution", when(df["txn_time"] > 1.6, "active").otherwise("inactive")).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+--------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|solution|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+--------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|inactive|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|inactive|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Scotland|    0.31|      HTTPS|Opera/8.

In [42]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, lead, lag, row_number, avg, sum

In [43]:
window_spec = Window.partitionBy("country").orderBy(df["dlbytes"].desc())

In [44]:
df.withColumn("rank", rank().over(window_spec)).show()

+-------------------+-------------+-----------------+-------+-------+---------------+---------------+-------+--------+-----------+--------------------+--------+----+
|          date_time|       userid|           domain|dlbytes|ulbytes|       clientip|       serverip|country|txn_time|http_method|          user_agent|platform|rank|
+-------------------+-------------+-----------------+-------+-------+---------------+---------------+-------+--------+-----------+--------------------+--------+----+
|2023-10-01 03:19:03|2236747991119|edwards-patel.com| 999910| 283407|   78.178.15.26|   132.242.86.8|  Wales|    1.92|       HTTP|Mozilla/5.0 (iPod...|     Mac|   1|
|2023-10-01 23:03:56|4887900415183|obrien-davies.biz| 999862| 592320| 14.115.162.182| 75.140.101.114|  Wales|    0.56|       HTTP|Mozilla/5.0 (Wind...|   Linux|   2|
|2023-10-01 17:21:28|1006503238036|evans-clarke.info| 999847| 256124|  67.147.205.26| 174.149.86.105|  Wales|    1.18|      HTTPS|Mozilla/5.0 (Wind...| Windows|   3|
|202

In [45]:
window_spec = Window.partitionBy("country").orderBy("date_time")

In [46]:
df.withColumn("row_number", row_number().over(window_spec)).show()

+-------------------+-------------+-------------------+-------+-------+---------------+---------------+-------+--------+-----------+--------------------+--------+----------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|       serverip|country|txn_time|http_method|          user_agent|platform|row_number|
+-------------------+-------------+-------------------+-------+-------+---------------+---------------+-------+--------+-----------+--------------------+--------+----------+
|2023-10-01 00:00:01|0434470039855|    pritchard.co.uk| 350605| 860883|  70.236.57.196| 33.243.179.115|  Wales|     0.4|       HTTP|Mozilla/5.0 (Maci...| Android|         1|
|2023-10-01 00:01:15|5209250274907|          mason.com| 776949| 127553|    70.80.2.225| 210.37.227.198|  Wales|     0.7|      HTTPS|Mozilla/5.0 (Maci...|     iOS|         2|
|2023-10-01 00:03:28|8395569790526|          hayes.com| 901828| 251710|114.185.183.112|  14.33.205.237|  Wales|    0.51|      HTTP

In [47]:
window_spec = Window.partitionBy().orderBy("date_time")

In [48]:
df.withColumn("next_bytes", lead("dlbytes").over(window_spec)).show()

+-------------------+-------------+--------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+----------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|next_bytes|
+-------------------+-------------+--------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+----------+
|2023-10-01 00:00:01|0434470039855|     pritchard.co.uk| 350605| 860883|  70.236.57.196|33.243.179.115|   Wales|     0.4|       HTTP|Mozilla/5.0 (Maci...| Android|     60865|
|2023-10-01 00:00:01|6242602789270|      ellis-hale.com|  60865| 378320|    65.219.1.13|     2.4.27.23| Ireland|    1.89|      HTTPS|Opera/9.61.(X11; ...|   Linux|    523144|
|2023-10-01 00:00:26|0013344619567|          rogers.com| 523144| 629889|   81.57.118.57|181.88.216.240| Ireland|    2.08|    

In [49]:
window_spec = Window.partitionBy("userid")

In [50]:
df.withColumn("sumfunction", sum("dlbytes").over(window_spec)).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|sumfunction|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|     756778|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|     988049|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Scotland|    0.31|     

In [51]:
window_spec = Window.partitionBy("domain")

In [52]:
df.withColumn("avgbytes", avg("dlbytes").over(window_spec)).show()

+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+--------+
|          date_time|       userid|              domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|avgbytes|
+-------------------+-------------+--------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+--------+
|2023-10-02 08:20:20|2765945342037|     adams-ahmed.com| 616224| 768126| 10.161.122.159|   159.83.81.10| Ireland|     2.3|      HTTPS|Mozilla/5.0 (comp...|   Linux|616224.0|
|2023-10-01 21:31:02|2826207025615|    adams-bell.co.uk| 435697| 834184|   65.7.157.224|  23.97.116.137| Ireland|    0.62|      HTTPS|Mozilla/5.0 (Wind...|   Linux|435697.0|
|2023-10-03 07:40:05|0602650180307|  adams-crawford.org| 479062| 520187|   191.11.47.18|203.207.117.158| England|     2.8|      HT

In [53]:
df.filter(   ~(df["country"] == 'Wales')   & ~(df["domain"] == "hopkins.org")  ).show()

+-------------------+-------------+-------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+-------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157|  85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204| 153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|     40.4.95.67|Scotland|    0.31|      HTTPS|Opera/8.85.(X11; ...|     Mac|
|2023-10-05 02:4

In [54]:
countries = ["England", "Scotland"]

df.filter(df["country"].isin(countries)).show()

+-------------------+-------------+-------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|       serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+-------------------+-------+-------+---------------+---------------+--------+--------+-----------+--------------------+--------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157|  85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|     40.4.95.67|Scotland|    0.31|      HTTPS|Opera/8.85.(X11; ...|     Mac|
|2023-10-05 02:49:31|0012989976074|           ryan.com| 186727| 285270|  97.61.198.125|  35.115.52.150| England|    1.09|      HTTPS|Opera/8.32.(X11; ...| Windows|
|2023-10-03 04:3

In [55]:
df.filter(df["country"].rlike("tland")).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Scotland|    0.31|      HTTPS|Opera/8.85.(X11; ...|     Mac|
|2023-10-02 08:19:43|0025484554969|  barnett-singh.com| 228620| 295478|  12.80.235.224|  20.62.31.155|Scotland|    1.43|      HTTPS|Mozilla/5.0 (iPho...| Windows|
|2023-10-03 02:01:19|0

In [56]:
from pyspark.sql.functions import udf

In [57]:
from pyspark.sql.types import IntegerType

In [58]:
def calculate_total_bytes(dlbytes, ulbytes):
    return dlbytes + ulbytes

total_bytes_udf = udf(calculate_total_bytes, IntegerType())

df.withColumn("total_bytes", total_bytes_udf(df["dlbytes"], df["ulbytes"])).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|total_bytes|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|     993328|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|    1104776|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Scotland|    0.31|     

In [59]:
from pyspark.sql.types import StringType

In [60]:
def custom_text_processing(http_method):
    return http_method.lower()

custom_text_processing_udf = udf(custom_text_processing, StringType())

df.withColumn("processed_http", custom_text_processing_udf(df["http_method"])).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+--------------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|processed_http|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+--------------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|         https|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|          http|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4.95.67|Scotland

In [61]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import round

In [62]:
def new_txn_time(txn_time):
    return txn_time*1000

new_txn_time_udf = udf(new_txn_time, DoubleType())

df.withColumn("txn_time_millisec", round(new_txn_time(df["txn_time"]),0)).show()

+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------------+
|          date_time|       userid|             domain|dlbytes|ulbytes|       clientip|      serverip| country|txn_time|http_method|          user_agent|platform|txn_time_millisec|
+-------------------+-------------+-------------------+-------+-------+---------------+--------------+--------+--------+-----------+--------------------+--------+-----------------+
|2023-10-01 11:05:24|0004910615244|         robson.org| 756778| 236550| 35.242.245.157| 85.188.139.63|Scotland|    0.83|      HTTPS|Opera/9.66.(Windo...|     iOS|            830.0|
|2023-10-02 18:02:00|0007469702985|          smith.biz| 988049| 116727|  86.195.34.204|153.106.210.57| Ireland|    1.18|       HTTP|Mozilla/5.0 (comp...|   Linux|           1180.0|
|2023-10-05 16:34:30|0009633130876|davies-phillips.com| 546244| 819216|   37.140.78.51|    40.4

In [63]:
from pyspark.sql.functions import year, month, dayofmonth, date_format

In [64]:
df2 = df.select("date_time")
df2.show()

+-------------------+
|          date_time|
+-------------------+
|2023-10-04 06:21:05|
|2023-10-03 23:34:32|
|2023-10-01 07:30:22|
|2023-10-02 20:14:20|
|2023-10-03 17:53:45|
|2023-10-05 22:07:58|
|2023-10-02 10:45:43|
|2023-10-02 06:26:16|
|2023-10-03 12:55:00|
|2023-10-01 11:05:24|
|2023-10-04 04:31:36|
|2023-10-05 02:30:26|
|2023-10-05 15:06:32|
|2023-10-02 09:27:40|
|2023-10-03 21:14:25|
|2023-10-04 00:30:59|
|2023-10-04 16:00:00|
|2023-10-02 18:02:00|
|2023-10-01 14:18:26|
|2023-10-01 16:03:24|
+-------------------+
only showing top 20 rows



In [65]:
df2.withColumn("year" , year(df2["date_time"]))\
.withColumn("month" , month(df2["date_time"]))\
.withColumn("day" , dayofmonth(df2["date_time"]))\
.withColumn("new_date", date_format(df2["date_time"], "yyyy-MM-dd"))\
.show()

+-------------------+----+-----+---+----------+
|          date_time|year|month|day|  new_date|
+-------------------+----+-----+---+----------+
|2023-10-04 06:21:05|2023|   10|  4|2023-10-04|
|2023-10-03 23:34:32|2023|   10|  3|2023-10-03|
|2023-10-01 07:30:22|2023|   10|  1|2023-10-01|
|2023-10-02 20:14:20|2023|   10|  2|2023-10-02|
|2023-10-03 17:53:45|2023|   10|  3|2023-10-03|
|2023-10-05 22:07:58|2023|   10|  5|2023-10-05|
|2023-10-02 10:45:43|2023|   10|  2|2023-10-02|
|2023-10-02 06:26:16|2023|   10|  2|2023-10-02|
|2023-10-03 12:55:00|2023|   10|  3|2023-10-03|
|2023-10-01 11:05:24|2023|   10|  1|2023-10-01|
|2023-10-04 04:31:36|2023|   10|  4|2023-10-04|
|2023-10-05 02:30:26|2023|   10|  5|2023-10-05|
|2023-10-05 15:06:32|2023|   10|  5|2023-10-05|
|2023-10-02 09:27:40|2023|   10|  2|2023-10-02|
|2023-10-03 21:14:25|2023|   10|  3|2023-10-03|
|2023-10-04 00:30:59|2023|   10|  4|2023-10-04|
|2023-10-04 16:00:00|2023|   10|  4|2023-10-04|
|2023-10-02 18:02:00|2023|   10|  2|2023

In [66]:
from pyspark.sql.functions import dayofweek, when

In [67]:
challenge = df.select("date_time")

In [68]:
step = challenge.withColumn("daynumber", dayofweek(challenge["date_time"]))

In [69]:
step.withColumn("weekend", when(step["daynumber"].between(2,6), "weekday").otherwise("weekend")).show()

+-------------------+---------+-------+
|          date_time|daynumber|weekend|
+-------------------+---------+-------+
|2023-10-04 06:21:05|        4|weekday|
|2023-10-03 23:34:32|        3|weekday|
|2023-10-01 07:30:22|        1|weekend|
|2023-10-02 20:14:20|        2|weekday|
|2023-10-03 17:53:45|        3|weekday|
|2023-10-05 22:07:58|        5|weekday|
|2023-10-02 10:45:43|        2|weekday|
|2023-10-02 06:26:16|        2|weekday|
|2023-10-03 12:55:00|        3|weekday|
|2023-10-01 11:05:24|        1|weekend|
|2023-10-04 04:31:36|        4|weekday|
|2023-10-05 02:30:26|        5|weekday|
|2023-10-05 15:06:32|        5|weekday|
|2023-10-02 09:27:40|        2|weekday|
|2023-10-03 21:14:25|        3|weekday|
|2023-10-04 00:30:59|        4|weekday|
|2023-10-04 16:00:00|        4|weekday|
|2023-10-02 18:02:00|        2|weekday|
|2023-10-01 14:18:26|        1|weekend|
|2023-10-01 16:03:24|        1|weekend|
+-------------------+---------+-------+
only showing top 20 rows



END