In [1]:
!pip install pyspark==3.5.3

Collecting pyspark==3.5.3
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=3d7eec46b57de82649aceb5dd522fb19466855a9a91ab94b8b9f535741c79f4e
  Stored in directory: /root/.cache/pip/wheels/07/a0/a3/d24c94bf043ab5c7e38c30491199a2a11fef8d2584e6df7fb7
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.1
    Uninstalling pyspark-3.5.1:
      Successfully uninstalled pyspark-3.5.1
Successfully installed pyspark-3.5.3


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

In [3]:
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt

--2025-11-21 02:36:40--  https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
Resolving s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)... 3.5.82.196, 52.92.178.170, 3.5.84.104, ...
Connecting to s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)|3.5.82.196|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 286779 (280K) [text/plain]
Saving to: ‘transfer_cost.txt’


2025-11-21 02:36:41 (1.35 MB/s) - ‘transfer_cost.txt’ saved [286779/286779]



In [4]:
!ls -tl

total 288
drwxr-xr-x 1 root root   4096 Nov 17 14:29 sample_data
-rw-r--r-- 1 root root 286779 Apr 24  2022 transfer_cost.txt


In [5]:
!head -5 transfer_cost.txt

On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today
On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today


In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")

In [7]:
transfer_cost_df.show(truncate=False)

+---------------------------------------------------------------------------+
|text                                                                       |
+---------------------------------------------------------------------------+
|On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today  |
|On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today  |
|On 2021-01-04 the cost per ton from 85001 to 85012 is $18.98 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85013 is 26.64 at Haul Today  |
|On 2021-01-04 the cost per ton from 85001 to 85020 is 26.34 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85001 to 85021 is $20.15 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85002 to 85001 is 21.57 at 

In [8]:
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

df_with_new_columns = transfer_cost_df\
    .withColumn('week', regexp_extract('text', regex_str, 1))\
    .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
    .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
    .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
    .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))

In [9]:
df_with_new_columns.printSchema()

root
 |-- text: string (nullable = true)
 |-- week: string (nullable = true)
 |-- departure_zipcode: string (nullable = true)
 |-- arrival_zipcode: string (nullable = true)
 |-- cost: string (nullable = true)
 |-- vendor: string (nullable = true)



In [10]:
final_df = df_with_new_columns.drop("text")

In [11]:
final_df.write.csv("extracted.csv")

In [12]:
!ls -tl

total 292
drwxr-xr-x 2 root root   4096 Nov 21 02:40 extracted.csv
drwxr-xr-x 1 root root   4096 Nov 17 14:29 sample_data
-rw-r--r-- 1 root root 286779 Apr 24  2022 transfer_cost.txt


In [13]:
!ls -tl extracted.csv/

total 156
-rw-r--r-- 1 root root      0 Nov 21 02:40 _SUCCESS
-rw-r--r-- 1 root root 156423 Nov 21 02:40 part-00000-54033fdf-4094-4451-89d1-ed04e3e4af53-c000.csv


In [14]:
!head -5 extracted.csv/part-00000-54033fdf-4094-4451-89d1-ed04e3e4af53-c000.csv

2021-01-04,85001,85002,$28.32,ABC Hauling
2021-01-04,85001,85004,$25.68,ABC Hauling
2021-01-04,85001,85007,19.86,ABC Hauling
2021-01-04,85001,85007,20.52,Haul Today
2021-01-04,85001,85010,20.72,Haul Today


In [15]:
final_df.write.format("json").save("extracted.json")

In [16]:
!ls -tl extracted.json/

total 428
-rw-r--r-- 1 root root      0 Nov 21 02:43 _SUCCESS
-rw-r--r-- 1 root root 436305 Nov 21 02:43 part-00000-2d2e86de-b7bc-466a-a6ae-8fe80b02d6b1-c000.json


In [17]:
!head -1 extracted.json/part-00000-2d2e86de-b7bc-466a-a6ae-8fe80b02d6b1-c000.json

{"week":"2021-01-04","departure_zipcode":"85001","arrival_zipcode":"85002","cost":"$28.32","vendor":"ABC Hauling"}
