# Finding the Treatment Group

Pre requisites
1. config.py in the config folder in the following format
ACCESS_TOKEN = "ghp_xxxx"
GITHUB_TOKEN = "github_pat_xxxxx"
2. data folder where all data will be stored

#### Libraries

In [50]:
import pandas as pd
import requests
import os
import sys
from config import config
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import logging
from typing import List, Union
import time

In [51]:
access_token = config.ACCESS_TOKEN
github_token = config.GITHUB_TOKEN

In [52]:
import os
os.environ["PYSPARK_PYTHON"] = "C:\\Users\\mundt\\anaconda3\\envs\\work_project\\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "C:\\Users\\mundt\\anaconda3\\envs\\work_project\\python.exe"

In [53]:
spark.stop()

In [54]:
# Initialization
## !!!! CHANGE THE MEMORY FOR THE FINAL MACHINE !!!!!
spark = SparkSession.builder \
   .appName("LargeJSONProcessing") \
   .config("spark.driver.memory", "6g") \
   .getOrCreate()


#### Download the example file

In [4]:
# Ensure the 'data' folder exists
if not os.path.exists('data'):
    os.makedirs('data')

url = 'https://data.gharchive.org/2023-04-01-15.json.gz'
file_path = os.path.join('data', '2023-04-01-15.json.gz')

# Check if the file already exists
if os.path.exists(file_path):
    print(f"The file already exists at {file_path}. No need to download.")
else:
    response = requests.get(url, stream=True)
    # Check if the request was successful (HTTP Status Code 200)
    if response.status_code == 200:
        # Write the file
        with open(file_path, 'wb') as file:
            for chunk in response.iter_content(chunk_size=128):
                file.write(chunk)
        print(f"File downloaded successfully and saved to {file_path}")
    else:
        print("Failed to fetch the file")

The file already exists at data\2023-04-01-15.json.gz. No need to download.


#### read in only the example file

In [6]:
# Read the data using Spark
df_spark = spark.read.json(file_path)
df_spark.show(5)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|       type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|{https://avatars....|2023-04-01T15:00:00Z|28137501092|{https://avatars....|{NULL, e743bdd287...|  true|{568277185, stdli...|  PushEvent|
|{https://avatars....|2023-04-01T15:00:00Z|28137501094|                NULL|{NULL, NULL, NULL...|  true|{622248753, ishud...|CreateEvent|
|{https://avatars....|2023-04-01T15:00:00Z|28137501097|                NULL|{NULL, NULL, NULL...|  true|{622248756, bxbao...|CreateEvent|
|{https://avatars....|2023-04-01T15:00:00Z|28137501098|                NULL|{NULL, 0233f4a6e9...|  true|{622201481, alwaz...|  PushEvent|
|{https://avatars....|2023-04-01T1

### Download the files

In [14]:
import os
import requests
import gzip
import pandas as pd

# Configure logging
logging.basicConfig(level=logging.INFO)

BASE_URL = "https://data.gharchive.org"

def ensure_directory_exists(dir_name: str) -> None:
    """Ensure the specified directory exists."""
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

def download_file(url: str, file_path: str) -> None:
    """Download file and save it to the specified path."""
    try:
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            with open(file_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=128):
                    f.write(chunk)
        else:
            logging.warning(f"File not found: {url}")
    except requests.RequestException as e:
        logging.error(f"Failed to fetch {url}: {str(e)}")

def load_or_fetch_data(file_path: str, url: str) -> Union[pd.DataFrame, None]:
    """Load data from file or fetch from URL if not exists."""
    if os.path.exists(file_path):
        logging.info(f"File {file_path} already exists. Loading data.")
        with gzip.open(file_path, 'rt', encoding='utf-8') as gz:
            return pd.read_json(gz, lines=True)
    else:
        logging.info(f"File {file_path} not exists. Downloading from {url}")
        download_file(url, file_path)
        if os.path.exists(file_path):
            with gzip.open(file_path, 'rt', encoding='utf-8') as gz:
                return pd.read_json(gz, lines=True)
    return None

def download_gh_archive(start_day: int, end_day: int, 
                        start_hour: int=0, end_hour: int=23,
                        base_url: str=BASE_URL, data_dir: str='data') -> pd.DataFrame:
    """Download GitHub archive data for specified days and hours."""
    dfs = []
    ensure_directory_exists(data_dir)
    
    for day in range(start_day, end_day + 1):
        for hour in range(start_hour, end_hour + 1):
            filename = f"2023-04-{day:02d}-{hour}.json.gz"
            url = f"{base_url}/{filename}"
            file_path = os.path.join(data_dir, filename)
            
            df = load_or_fetch_data(file_path, url)
            if df is not None:
                dfs.append(df)

    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


In [None]:
# Record the start time
start_time = time.time()
#Define the start and end date-time
start_date_time = "2023-04-01 01"  # Format: "YYYY-MM-DD HH"
end_date_time = "2023-04-01 12"    # Format: "YYYY-MM-DD HH"
# Extract day and hour from the date-time strings
start_day = int(start_date_time.split("-")[2].split()[0])
start_hour = int(start_date_time.split()[1])
end_day = int(end_date_time.split("-")[2].split()[0])
end_hour = int(end_date_time.split()[1])

# Download data for the specified time frame
df_big = download_gh_archive(start_day, end_day, start_hour, end_hour)

# Record the end time and calculate the elapsed time
end_time = time.time()
elapsed_time = end_time - start_time

print(f"Data from {start_date_time} to {end_date_time} loaded into DataFrame!")
print(f"Time taken: {elapsed_time:.2f} seconds")

INFO:root:File data\2023-04-01-1.json.gz not exists. Downloading from https://data.gharchive.org/2023-04-01-1.json.gz
INFO:root:File data\2023-04-01-2.json.gz not exists. Downloading from https://data.gharchive.org/2023-04-01-2.json.gz
INFO:root:File data\2023-04-01-3.json.gz not exists. Downloading from https://data.gharchive.org/2023-04-01-3.json.gz
INFO:root:File data\2023-04-01-4.json.gz not exists. Downloading from https://data.gharchive.org/2023-04-01-4.json.gz
INFO:root:File data\2023-04-01-5.json.gz not exists. Downloading from https://data.gharchive.org/2023-04-01-5.json.gz
INFO:root:File data\2023-04-01-6.json.gz not exists. Downloading from https://data.gharchive.org/2023-04-01-6.json.gz


### Read in all files

In [29]:
# Directory containing the files
data_dir = 'data'
# List all files that match the pattern
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.startswith('2023') and f.endswith('.json.gz')]

# Read and concatenate the files using Spark
df_spark = spark.read.json(files)

# Optional: If you're planning on performing multiple operations on the DataFrame, cache it
# df_spark.cache()

# Display the first few rows as a sample
df_spark.show(5)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|       type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|{https://avatars....|2023-04-01T15:00:00Z|28137501092|{https://avatars....|{NULL, e743bdd287...|  true|{568277185, stdli...|  PushEvent|
|{https://avatars....|2023-04-01T15:00:00Z|28137501094|                NULL|{NULL, NULL, NULL...|  true|{622248753, ishud...|CreateEvent|
|{https://avatars....|2023-04-01T15:00:00Z|28137501097|                NULL|{NULL, NULL, NULL...|  true|{622248756, bxbao...|CreateEvent|
|{https://avatars....|2023-04-01T15:00:00Z|28137501098|                NULL|{NULL, 0233f4a6e9...|  true|{622201481, alwaz...|  PushEvent|
|{https://avatars....|2023-04-01T1

## 1. Identifying user location based on their GitHub profiles


####  GraphQL for batch requests to fetch data for multiple users in one request instead of making a request for each user and constantly hitting the rate limit for the REST API
1. GraphQL Test: Creates data subset, constructs/executes GraphQL query for user locations
2. Italian Identification: Uses Italian keywords to filter and display Italian users
- subset of 500 Users >> 3 Italians identified

#### Step 1: Extract Unique Logins

In [30]:
from pyspark.sql.functions import col

# Filter out logins that start with a digit and extract unique logins
logins_df = df_spark.filter(~col('actor.login').substr(1, 1).rlike('[0-9]')) \
                    .select('actor.login') \
                    .distinct()

# Now collect the unique logins to the driver (this should be manageable since they're unique)
logins = [row['login'] for row in logins_df.collect()]


#### Step 2: Batched GraphQL Requests

In [33]:
import requests

def sanitize_for_alias(username):
    return ''.join(ch if ch.isalnum() else '_' for ch in username)

def construct_query(logins):
    query_parts = [
        f'''
        {sanitize_for_alias(login)}: user(login: "{login}") {{
            location
        }}
        ''' for login in logins
    ]
    return '{' + ''.join(query_parts) + '}'

def fetch_data(query, github_token):
    headers = {
        'Authorization': 'bearer ' + github_token,
        'Content-Type': 'application/json'
    }
    response = requests.post('https://api.github.com/graphql', json={'query': query}, headers=headers)
    
    if response.status_code != 200:
        raise Exception(f"Request failed with status code {response.status_code}: {response.text}")

    response_json = response.json()
    
    if 'data' not in response_json:
        raise Exception("Data key missing from response:", response_json)
    return response_json['data']

def batched_fetch_data(logins, batch_size, github_token):
    all_data = {}
    
    for i in range(0, len(logins), batch_size):
        batch = logins[i:i + batch_size]
        query = construct_query(batch)
        data = fetch_data(query, github_token)
        
        # Update the results dictionary. The response data will have keys based on the sanitized alias.
        # We want to map it back to the original login for simplicity.
        for login in batch:
            alias = sanitize_for_alias(login)
            if alias in data:
                user_data = data[alias]
                # Using the original login here
                all_data[login] = user_data

    return all_data

# Adjusting the batch size to 5000 as you mentioned
batch_size = 2000
all_user_data = batched_fetch_data(logins, batch_size, github_token)


### Step 3: Create a DataFrame from GraphQL Results and Join

In [56]:
from pyspark.sql import Row

# Convert the user data dictionary to a list of Rows
rows = [Row(login=key, location=value['location'] if value else None) for key, value in all_user_data.items()]

# Create a Spark DataFrame from the Rows
all_users_df = spark.createDataFrame(rows)
all_users_df.show()


+-------------------+--------------------+
|              login|            location|
+-------------------+--------------------+
|            iMariee|                NULL|
|        edu-rinaldi|              Berlin|
|Chaitanyashrimali21|                NULL|
|        shubhamkr83|           New Delhi|
|         FiresoftPH|                NULL|
|            Pamavoc|               Paris|
|            Ituking|                NULL|
|               cpba|                NULL|
|        Trishna1234|             Kolkata|
|         Pablotramp|                NULL|
|            dev0652|Ukraine, best cou...|
|           Hritik95|                NULL|
|        YannickMath|                NULL|
|         Nanonikich|                NULL|
|  Sneha-Mittal88293|Bari , Dholpur (R...|
|           shweepps|                NULL|
|             vr33ni|                NULL|
|         FruiteePro|               Earth|
|      adviti-mishra|                NULL|
|             gfmota|       São Paulo -SP|
+----------

In [61]:
# Assuming `spark_df` is your Spark DataFrame
pandas_df = all_users_df.toPandas()
pandas_df.to_csv("data/user_location.csv")

#### 4. Select only the italian one

In [76]:
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from fuzzywuzzy import process

# Assuming the list of Italian keywords is already defined
italian_keywords = ["rome", "milan", "italy", "florence", "venice", "naples", "turin", "palermo", "genoa", "bologna"]

# Convert the is_italian_location function to a PySpark UDF
@udf(BooleanType())
def is_italian_location_udf(location):
    if not location:
        return False
    location = location.lower()
    if any(keyword in location for keyword in italian_keywords):
        return True
    # Using fuzzy matching to account for typos
    closest_match, score = process.extractOne(location, italian_keywords)
    return score > 90

# Filter in rows with Italian locations
italian_df_spark = all_users_df.filter(is_italian_location_udf(col("location")))

# No need for flattening as there's no nested column 'actor'. Just display the result.
italian_df_spark.show(5)


+--------------+------------+
|         login|    location|
+--------------+------------+
|    crondaemon|Turin, Italy|
| CristianCosci|       Italy|
|LudovicoCIocci|       Italy|
|     FabTheZen| Bari, Italy|
|     Manu098vm|       Italy|
+--------------+------------+
only showing top 5 rows



In [77]:
italian_df_spark.count()

446

In [78]:
from pyspark.sql.functions import count

italian_df_spark.groupBy("location").agg(count("location").alias("count")).orderBy("count", ascending=False).show()


+--------------+-----+
|      location|count|
+--------------+-----+
|         Italy|  168|
|  Milan, Italy|   29|
|   Rome, Italy|   20|
|          Rome|   18|
|         Milan|   13|
|  Turin, Italy|   11|
|        Milano|    8|
|Bologna, Italy|    8|
|         Turin|    6|
| Verona, Italy|    6|
|       Bologna|    5|
|   Bari, Italy|    4|
|Bergamo, Italy|    4|
|Brescia, Italy|    4|
|         italy|    3|
|Palermo, Italy|    3|
| Milano, Italy|    3|
|     Milan, IT|    3|
|    Venice, CA|    3|
|Treviso, Italy|    3|
+--------------+-----+
only showing top 20 rows



In [82]:
df_spark.select("actor").schema

StructType([StructField('actor', StructType([StructField('avatar_url', StringType(), True), StructField('display_login', StringType(), True), StructField('gravatar_id', StringType(), True), StructField('id', LongType(), True), StructField('login', StringType(), True), StructField('url', StringType(), True)]), True)])

---------------------------------------

In [83]:
from pyspark.sql import functions as F

# Extract the 'login' column from the Italian users dataframe
italian_logins = italian_df_spark.select("login")

# Perform an inner join to filter out the actors in df_spark based on the Italian logins
filtered_actors_df = df_spark.join(italian_logins, df_spark.actor.login == italian_logins.login, 'inner').select("actor")

# Display the result
filtered_actors_df.show(5)


Py4JJavaError: An error occurred while calling o473.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
java.lang.reflect.Constructor.newInstance(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Unknown Source)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
java.lang.reflect.Constructor.newInstance(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Unknown Source)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2707)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.$anonfun$apply$1(CoalesceShufflePartitions.scala:60)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions$$Lambda$3689/5975950.apply$mcI$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:57)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:33)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$optimizeQueryStage$1(AdaptiveSparkPlanExec.scala:163)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3686/1030639414.apply(Unknown Source)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.optimizeQueryStage(AdaptiveSparkPlanExec.scala:162)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:572)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:522)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:561)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3672/1105860076.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.TraversableLike$$Lambda$75/704106237.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:561)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:561)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3672/1105860076.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.TraversableLike$$Lambda$75/704106237.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:561)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:561)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3672/1105860076.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.TraversableLike$$Lambda$75/704106237.apply(Unknown Source)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:561)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:261)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3670/748390478.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:256)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:401)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset$$Lambda$3217/543278671.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.Dataset$$Lambda$3319/1897177885.apply(Unknown Source)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset$$Lambda$3218/125248680.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$3226/1208694368.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$3219/1317167135.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
