In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

def fetch_api_data(limit=100, page=None, table_name="api_comments_table"):
    spark = SparkSession.builder.getOrCreate()

    base_url = "https://jsonplaceholder.typicode.com/comments"
    
    schema = StructType([
        StructField("postId", IntegerType(), True),
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("body", StringType(), True),
    ])
    
    try:
        if page is not None:
            response = requests.get(base_url, params={"_limit": limit, "_page": page})
            response.raise_for_status()        # Raises HTTPError if status is 4xx or 5xx

            data = response.json()
            if not data:
                # Empty page response, return all data from table as DataFrame
                if page > 5:
                    print(f"No data found on page {page}. Pagination complete. Returning collected data.")
                return spark.table(table_name)

            return spark.createDataFrame(data, schema) # If data is present, it creates df

        # No page provided, fetch all pages
        all_data = []
        current_page = 1

        while True:
            response = requests.get(base_url, params={"_limit": limit, "_page": current_page})
            response.raise_for_status()
            # if response.status_code != 200:
            #     print("No response from API")
            #     return None

            pages_data = response.json()

            if current_page == 1 and not pages_data:
                # Case 2: Empty first page response, return all data from table
                return spark.table(table_name)

            if not pages_data:
                break

            all_data.extend(pages_data)
            current_page += 1

        # return all collected data as DataFrame
        return spark.createDataFrame(all_data, schema)

    except requests.RequestException:
        print("No response from API")
        return None
 
def save_all_data_to_table(limit=100, table_name="api_comments_table"):
    spark = SparkSession.builder.getOrCreate()

    # Fetch all data (all pages)
    df = fetch_api_data(limit=limit, page=None, table_name=table_name)
    if df is None:
        print("No data found to save.")
        return

    df.write.mode("overwrite").saveAsTable(table_name)
    print(f" Data saved to table '{table_name}' (records: {df.count()})")

def test_source_destination_count(api_url: str, table_name: str) -> str:
    """
    Test if the count of API data matches the count of data in Spark table.

    Returns:
    - "PASS" if counts match
    - "FAIL" if counts differ
    - Or error message string if something goes wrong
    """
    spark = SparkSession.builder.getOrCreate()

    try:
        # Fetch all API data in a single call (if supported)
        api_data = requests.get(api_url).json()

        if not api_data:
            return "FAIL: API returned empty data"

        api_count = len(api_data)
        dest_count = spark.table(table_name).count()
        # Compare counts
        if api_count == dest_count:
            print(f" Test Passed: API count ({api_count}) == Table count ({dest_count})")
            return "PASS"
        else:
            print(f" Test Failed: API count ({api_count}) != Table count ({dest_count})")
            return "FAIL"

    except requests.RequestException as e:
        return f"FAIL: API request error: {e}"

In [0]:
data1 = fetch_api_data(limit=100, page=3, table_name="api_comments_table")
data1.show(5)
print(data1.count())
# fetch_api_data(limit=100, table_name="api_comments_table1").show(5)

save_all_data_to_table(limit=100, table_name="api_comments_table")

result = test_source_destination_count(api_url="https://jsonplaceholder.typicode.com/comments", table_name="api_comments_table")
print("Test result:", result)

+------+---+--------------------+-----------------+--------------------+
|postId| id|                name|            email|                body|
+------+---+--------------------+-----------------+--------------------+
|    41|201|et adipisci aliqu...|   Cleve@royal.us|est officiis plac...|
|    41|202|blanditiis vel fu...|Donnell@polly.net|sequi expedita qu...|
|    41|203|ab enim adipisci ...|  Bonita@karl.biz|eum voluptates id...|
|    41|204|autem voluptates ...|Shea@angelina.biz|voluptatibus pari...|
|    41|205|et reiciendis ull...|Omari@veronica.us|voluptatem accusa...|
+------+---+--------------------+-----------------+--------------------+
only showing top 5 rows

100
 Data saved to table 'api_comments_table' (records: 500)
 Test Passed: API count (500) == Table count (500)
Test result: PASS


In [0]:
spark.sql("SELECT * FROM api_comments_table").show(5)

+------+---+--------------------+--------------------+--------------------+
|postId| id|                name|               email|                body|
+------+---+--------------------+--------------------+--------------------+
|     1|  1|id labore ex et q...|  Eliseo@gardner.biz|laudantium enim q...|
|     1|  2|quo vero reiciend...|Jayne_Kuhic@sydne...|est natus enim ni...|
|     1|  3|odio adipisci rer...| Nikita@garfield.biz|quia molestiae re...|
|     1|  4|      alias odio sit|       Lew@alysha.tv|non et atque\nocc...|
|     1|  5|vero eaque aliqui...|   Hayden@althea.biz|harum non quasi e...|
+------+---+--------------------+--------------------+--------------------+
only showing top 5 rows



In [0]:
spark.sql("SELECT * FROM api_comments_table LIMIT 6").show()

+------+---+--------------------+--------------------+--------------------+
|postId| id|                name|               email|                body|
+------+---+--------------------+--------------------+--------------------+
|     1|  1|id labore ex et q...|  Eliseo@gardner.biz|laudantium enim q...|
|     1|  2|quo vero reiciend...|Jayne_Kuhic@sydne...|est natus enim ni...|
|     1|  3|odio adipisci rer...| Nikita@garfield.biz|quia molestiae re...|
|     1|  4|      alias odio sit|       Lew@alysha.tv|non et atque\nocc...|
|     1|  5|vero eaque aliqui...|   Hayden@althea.biz|harum non quasi e...|
|     2|  6|et fugit eligendi...|Presley.Mueller@m...|doloribus at sed ...|
+------+---+--------------------+--------------------+--------------------+



In [0]:
%sql
SELECT * FROM api_comments_table LIMIT 5

postId,id,name,email,body
1,1,id labore ex et quam laborum,Eliseo@gardner.biz,laudantium enim quasi est quidem magnam voluptate ipsam eos tempora quo necessitatibus dolor quam autem quasi reiciendis et nam sapiente accusantium
1,2,quo vero reiciendis velit similique earum,Jayne_Kuhic@sydney.com,est natus enim nihil est dolore omnis voluptatem numquam et omnis occaecati quod ullam at voluptatem error expedita pariatur nihil sint nostrum voluptatem reiciendis et
1,3,odio adipisci rerum aut animi,Nikita@garfield.biz,quia molestiae reprehenderit quasi aspernatur aut expedita occaecati aliquam eveniet laudantium omnis quibusdam delectus saepe quia accusamus maiores nam est cum et ducimus et vero voluptates excepturi deleniti ratione
1,4,alias odio sit,Lew@alysha.tv,non et atque occaecati deserunt quas accusantium unde odit nobis qui voluptatem quia voluptas consequuntur itaque dolor et qui rerum deleniti ut occaecati
1,5,vero eaque aliquid doloribus et culpa,Hayden@althea.biz,harum non quasi et ratione tempore iure ex voluptates in ratione harum architecto fugit inventore cupiditate voluptates magni quo et
