In [1]:
from pyspark.sql import SparkSession
import requests
import json
from pyspark.sql.functions import udf, col, explode
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql import Row

24/11/27 20:04:15 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [16]:
headers = {'content-type': "application/json"}
body = json.dumps({})

In [12]:
def executeRestApi(verb, url, headers, body):
    res = None
    try:
        if verb == "get":
            res = requests.get(url, headers=headers, params=json.loads(body))
        elif verb == "post":
            res = requests.post(url, headers=headers, data=body)
        else:
            return {"Count": 0, "Message": "Invalid verb", "SearchCriteria": "", "Results": []}

        if res.status_code == 200:
            data = res.json()
            return {
                "Count": data.get("Count", 0),
                "Message": data.get("Message", ""),
                "SearchCriteria": data.get("SearchCriteria", ""),
                "Results": data.get("Results", [])
            }
        else:
            return {"Count": 0, "Message": "Failed request", "SearchCriteria": "", "Results": []}
    except Exception as e:
        return {"Count": 0, "Message": str(e), "SearchCriteria": "", "Results": []}

In [13]:
schema = StructType([
  StructField("Count", IntegerType(), True),
  StructField("Message", StringType(), True),
  StructField("SearchCriteria", StringType(), True),
  StructField("Results", ArrayType(
    StructType([
      StructField("Make_ID", IntegerType()),
      StructField("Make_Name", StringType())
    ])
  ))
])

#
udf_executeRestApi = udf(executeRestApi, schema)
spark = SparkSession.builder.appName("UDF REST Demo").getOrCreate()

In [17]:
# requests
RestApiRequest = Row("verb", "url", "headers", "body")
RequestApiFunc = RestApiRequest("get", "https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json", headers, json.dumps({}))
request_df = spark.createDataFrame([RequestApiFunc]).withColumn("execute", udf_executeRestApi(col("verb"), col("url"), col("headers"), col("body")))
request_df.select(explode(col("execute.Results")).alias("results"))\
    .select(col("results.Make_ID"), col("results.Make_Name")).show(50)



+-------+--------------------+
|Make_ID|           Make_Name|
+-------+--------------------+
|  12858|   #1 ALPINE CUSTOMS|
|   4877|  1/OFF KUSTOMS, LLC|
|  11257| 102 IRONWORKS, INC.|
|  12255|12832429 CANADA INC.|
|  13053| 137 INDUSTRIES INC.|
|   6387|17 CREEK ENTERPRISES|
|  12948|  1955 CUSTOM BELAIR|
|   9172|1M CUSTOM CAR TRA...|
|   6124|1ST CHOICE MANUFA...|
|  12972|     2 GOLDEN EAGLES|
|   6488|  2-G TRAILER CO LLC|
|  11399|24/7 ONSITE CAMER...|
|    608|        280 TRAILERS|
|  10123|  3 CUSTOM SOLUTIONS|
|  11253|      3 STAR MFG LTD|
|   8792|3&1 ENTERPRISES, ...|
|   7388|33 EAST MAINTENAN...|
|  10005|      357 GOLF CARTS|
|  11527|         36 FEET LLC|
|  12488| 360 CUSTOM TRAILERS|
|   9857|3C CATTLE FEEDERS...|
|    675|  3D CUSTOM ALUMINUM|
|   4800|3M DYNAMIC MESSAG...|
|  12538|           3PLUSCOCO|
|    674|             3T MFG.|
|  11657|              4 BOSS|
|   8195|  4 STAR FABRICATION|
|  10980|                4 W |
|  12443|              4 WIDE|
|   8175

                                                                                