References to the documentation describing the structure of the JSON objects:  [Virustotal Domains Object](https://docs.virustotal.com/reference/domains-object)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
import os

In [2]:
os.listdir("bronze/virus_total")

['virus_total_batch_1.ndjson']

# Load VirusTotal JSON files into Spark DataFrame
### last_analysis_results
Represents individual antivirus engine results.
- Key = engine name.

- Values: detection method, engine name, category (malicious, harmless, etc.), and result.

### last_analysis_stats
Overall detection statistics. Counts how many engines flagged the sample as: malicious, suspicious, undetected, harmless, or timeout.

### Main schema:

- last_https_certificate_date → Last SSL certificate date.

- last_analysis_date → Last VirusTotal analysis date.

- whois → WHOIS registration information.

- reputation → Reputation score assigned by VirusTotal.

- jarm → SSL/TLS fingerprint hash.

- last_analysis_stats → Aggregated detection statistics.

- last_analysis_results → Per-engine detection results.

- tld → Top-level domain (.com, .org, etc.).

- registrar → Domain registrar entity.

- popularity_ranks → Rankings from providers (e.g., Alexa, Cisco Umbrella).

- tags → Tags assigned to the resource.

- categories → Classification categories (e.g., phishing, malware).

- total_votes → Community votes (harmless vs malicious).

In [3]:
spark = SparkSession.builder.appName("VirusTotalIngest").getOrCreate()

from pyspark.sql.types import (
    StructType, StructField, StringType, LongType, IntegerType,
    ArrayType, MapType
)

# last_analysis_results schema
last_analysis_results_schema = MapType(
    StringType(),
    StructType([
        StructField("method", StringType(), True),
        StructField("engine_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("result", StringType(), True),
    ])
)

# last_analysis_stats schema
last_analysis_stats_schema = StructType([
    StructField("malicious", IntegerType(), True),
    StructField("suspicious", IntegerType(), True),
    StructField("undetected", IntegerType(), True),
    StructField("harmless", IntegerType(), True),
    StructField("timeout", IntegerType(), True),
])

# Main schema
schema = StructType([
    StructField("id", StringType(), True),
    StructField("file_extracted", StringType(), True),
    StructField("response", StructType([
        StructField("data", StructType([
            StructField("id", StringType(), True),
            StructField("type", StringType(), True),
            StructField("links", StructType([
                StructField("self", StringType(), True),
            ]), True),
            StructField("attributes", StructType([
                StructField("last_https_certificate_date", LongType(), True),
                StructField("last_analysis_date", LongType(), True),
                StructField("whois", StringType(), True),
                StructField("reputation", IntegerType(), True),
                StructField("jarm", StringType(), True),
                StructField("last_analysis_stats", last_analysis_stats_schema, True),
                StructField("last_analysis_results", last_analysis_results_schema, True),
                StructField("tld", StringType(), True),
                StructField("registrar", StringType(), True),
                StructField("last_dns_records_date", LongType(), True),
                StructField("popularity_ranks", MapType(
                    StringType(),
                    StructType([
                        StructField("rank", LongType(), True),
                        StructField("timestamp", LongType(), True),
                    ])
                ), True),
                StructField("last_modification_date", LongType(), True),
                StructField("expiration_date", LongType(), True),
                StructField("tags", ArrayType(StringType()), True),
                StructField("last_update_date", LongType(), True),
                StructField("categories", MapType(StringType(), StringType()), True),
                StructField("creation_date", LongType(), True),
                StructField("total_votes", StructType([
                    StructField("harmless", IntegerType(), True),
                    StructField("malicious", IntegerType(), True),
                ]), True),
            ]), True),
        ]), True),
    ]), True),
])

df = spark.read.schema(schema).json("bronze/virus_total/*.ndjson")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/07 10:47:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/07 10:47:06 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: bronze/virus_total/*.ndjson.
java.io.FileNotFoundException: File bronze/virus_total/*.ndjson does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.F

At a high level, this code flattens and restructures the nested VirusTotal JSON data into a cleaner, tabular format suitable for analysis in Spark. In short, it transforms the raw, deeply nested VirusTotal JSON into a flat schema with columns ready for querying, aggregations, and exporting

In [4]:
from pyspark.sql.functions import col, to_json, concat_ws

df_flat = df.select(
    col("id").alias("root_id"),
    col("file_extracted"),
    col("response.data.id").alias("domain_id"),
    col("response.data.type").alias("domain_type"),
    col("response.data.links.self").alias("domain_link"),
    
    # atributos simples
    col("response.data.attributes.last_https_certificate_date"),
    col("response.data.attributes.last_analysis_date"),
    col("response.data.attributes.whois"),
    col("response.data.attributes.reputation"),
    col("response.data.attributes.jarm"),
    col("response.data.attributes.tld"),
    col("response.data.attributes.registrar"),
    col("response.data.attributes.last_dns_records_date"),
    col("response.data.attributes.last_modification_date"),
    col("response.data.attributes.expiration_date"),
    col("response.data.attributes.last_update_date"),
    col("response.data.attributes.creation_date"),
    
    # flatten: last_analysis_stats
    col("response.data.attributes.last_analysis_stats.malicious").alias("stats_malicious"),
    col("response.data.attributes.last_analysis_stats.suspicious").alias("stats_suspicious"),
    col("response.data.attributes.last_analysis_stats.undetected").alias("stats_undetected"),
    col("response.data.attributes.last_analysis_stats.harmless").alias("stats_harmless"),
    col("response.data.attributes.last_analysis_stats.timeout").alias("stats_timeout"),
    
    # flatten: total_votes
    col("response.data.attributes.total_votes.harmless").alias("votes_harmless"),
    col("response.data.attributes.total_votes.malicious").alias("votes_malicious"),
    
    # mantenemos last_analysis_results como JSON string
    to_json(col("response.data.attributes.last_analysis_results")).alias("last_analysis_results"),
    
    # flatten extra: popularity_ranks, categories, tags
    to_json(col("response.data.attributes.popularity_ranks")).alias("popularity_ranks"),
    to_json(col("response.data.attributes.categories")).alias("categories"),
    concat_ws(",", col("response.data.attributes.tags")).alias("tags")
)

df_flat.printSchema()
df_flat.show(1, truncate=False)

root
 |-- root_id: string (nullable = true)
 |-- file_extracted: string (nullable = true)
 |-- domain_id: string (nullable = true)
 |-- domain_type: string (nullable = true)
 |-- domain_link: string (nullable = true)
 |-- last_https_certificate_date: long (nullable = true)
 |-- last_analysis_date: long (nullable = true)
 |-- whois: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- jarm: string (nullable = true)
 |-- tld: string (nullable = true)
 |-- registrar: string (nullable = true)
 |-- last_dns_records_date: long (nullable = true)
 |-- last_modification_date: long (nullable = true)
 |-- expiration_date: long (nullable = true)
 |-- last_update_date: long (nullable = true)
 |-- creation_date: long (nullable = true)
 |-- stats_malicious: integer (nullable = true)
 |-- stats_suspicious: integer (nullable = true)
 |-- stats_undetected: integer (nullable = true)
 |-- stats_harmless: integer (nullable = true)
 |-- stats_timeout: integer (nullable = true)
 |-- votes

25/09/07 10:47:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------------------------+-------------------------------------------------------+---------------------------+-----------+---------------------------------------------------------------------+---------------------------+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

This block of code takes the flattened VirusTotal dataset and defines the final labeled dataset with a new target column.

In [5]:
from pyspark.sql import functions as F

df_final = df_flat.withColumn(
    "threat_status",
    F.when(F.col("file_extracted").contains("black_list"), "malicious")
     .when(F.col("file_extracted").contains("white_list"), "whitelist")
     .otherwise("unknown")
)
df_final = df_final.toPandas()
df_final = df_final.drop(['file_extracted'], axis=1)
df_final.columns

Index(['root_id', 'domain_id', 'domain_type', 'domain_link',
       'last_https_certificate_date', 'last_analysis_date', 'whois',
       'reputation', 'jarm', 'tld', 'registrar', 'last_dns_records_date',
       'last_modification_date', 'expiration_date', 'last_update_date',
       'creation_date', 'stats_malicious', 'stats_suspicious',
       'stats_undetected', 'stats_harmless', 'stats_timeout', 'votes_harmless',
       'votes_malicious', 'last_analysis_results', 'popularity_ranks',
       'categories', 'tags', 'threat_status'],
      dtype='object')

In [15]:
print(df_final['tags'].unique())
print()
print(df_final['tags'].info())

['' 'nxdomain,potential-c2' 'nxdomain' 'dga' 'hex']

<class 'pandas.core.series.Series'>
RangeIndex: 499 entries, 0 to 498
Series name: tags
Non-Null Count  Dtype 
--------------  ----- 
499 non-null    object
dtypes: object(1)
memory usage: 4.0+ KB
None


In [7]:
df_final = df_final.drop(
    ['domain_type', 'jarm',
     'domain_id', 'domain_link', 'last_https_certificate_date',
     'last_analysis_date', 'last_dns_records_date']
    , axis=1)

df_final.head()

Unnamed: 0,root_id,whois,reputation,tld,registrar,last_modification_date,expiration_date,last_update_date,creation_date,stats_malicious,...,stats_undetected,stats_harmless,stats_timeout,votes_harmless,votes_malicious,last_analysis_results,popularity_ranks,categories,tags,threat_status
0,0.client-channel.google.com,Creation Date: 1997-09-15T04:00:00Z\nCreation ...,0,com,MarkMonitor Inc.,1754931941,1852517000.0,1722565000.0,874306800.0,0,...,30,64,0,0,0,"{""Acronis"":{""method"":""blacklist"",""engine_name""...","{""Cisco Umbrella"":{""rank"":992903,""timestamp"":1...",{},,whitelist
1,17track.net,Creation Date: 2011-03-09T01:44:43Z\nDNSSEC: u...,0,net,Alibaba Cloud Computing Ltd. d/b/a HiChina (ww...,1756732487,1867711000.0,1746083000.0,1299635000.0,0,...,29,65,0,0,0,"{""Acronis"":{""method"":""blacklist"",""engine_name""...","{""Majestic"":{""rank"":9502,""timestamp"":175665108...","{""BitDefender"":""onlineshop"",""Sophos"":""general ...",,whitelist
2,1drv.com,Creation Date: 2013-08-05T18:33:50+0000\nCreat...,-58,com,MarkMonitor Inc.,1756734594,1785955000.0,1751628000.0,1375728000.0,0,...,29,65,0,0,4,"{""Acronis"":{""method"":""blacklist"",""engine_name""...","{""Statvoo"":{""rank"":20789,""timestamp"":168408348...","{""BitDefender"":""parked"",""Sophos"":""personal net...",,whitelist
3,1fichier.com,Admin City: REDACTED FOR PRIVACY\nAdmin Countr...,1,com,ONLINE SAS,1756735976,1765109000.0,1741239000.0,1260188000.0,1,...,29,64,0,1,0,"{""Acronis"":{""method"":""blacklist"",""engine_name""...","{""Majestic"":{""rank"":49174,""timestamp"":17566510...",{},,whitelist
4,1und1.de,Changed: 2017-05-11T09:15:31+02:00\nDomain: 1u...,1,de,,1756733769,,,,0,...,29,65,0,1,0,"{""Acronis"":{""method"":""blacklist"",""engine_name""...","{""Majestic"":{""rank"":10222,""timestamp"":17566510...",{},,whitelist


# Flattening and Cleaning JSON Columns: last_analysis_results & popularity_ranks

## last_analysis_results column
- last_analysis_results: <dictionary> result from URL scanners. dict with scanner name as key and a dict with notes/result from that scanner as value.
    - category: <string> normalised result. can be:
    - "harmless" (site is not malicious),
    - "undetected" (scanner has no opinion about this site),
    - "suspicious" (scanner thinks the site is suspicious),
    - "malicious" (scanner thinks the site is malicious).
    - engine_name: <string> complete name of the URL scanning service.
    - engine_version: <string> engine version value, in case it reports that data.
    - method: <string> type of service given by that URL scanning service (i.e. "blacklist").
    - result: <string> raw value returned by the URL scanner ("clean", "malicious", "suspicious", "phishing"). It may vary from scanner to scanner, hence the need for the "category" field for normalisation.

In [8]:
import json
json_formatted_str = json.dumps(df_final['last_analysis_results'][0], indent=2)
print(json_formatted_str)

"{\"Acronis\":{\"method\":\"blacklist\",\"engine_name\":\"Acronis\",\"category\":\"harmless\",\"result\":\"clean\"},\"0xSI_f33d\":{\"method\":\"blacklist\",\"engine_name\":\"0xSI_f33d\",\"category\":\"undetected\",\"result\":\"unrated\"},\"Abusix\":{\"method\":\"blacklist\",\"engine_name\":\"Abusix\",\"category\":\"harmless\",\"result\":\"clean\"},\"ADMINUSLabs\":{\"method\":\"blacklist\",\"engine_name\":\"ADMINUSLabs\",\"category\":\"harmless\",\"result\":\"clean\"},\"Axur\":{\"method\":\"blacklist\",\"engine_name\":\"Axur\",\"category\":\"undetected\",\"result\":\"unrated\"},\"Criminal IP\":{\"method\":\"blacklist\",\"engine_name\":\"Criminal IP\",\"category\":\"harmless\",\"result\":\"clean\"},\"AILabs (MONITORAPP)\":{\"method\":\"blacklist\",\"engine_name\":\"AILabs (MONITORAPP)\",\"category\":\"harmless\",\"result\":\"clean\"},\"AlienVault\":{\"method\":\"blacklist\",\"engine_name\":\"AlienVault\",\"category\":\"harmless\",\"result\":\"clean\"},\"alphaMountain.ai\":{\"method\":\"b

In this part of the code, we’re working with the last_analysis_results column, which is a JSON string containing results from many different antivirus engines for the same domain.

- Each engine has its own category and result.

- In practice, most of the engines tend to give the same value, so there’s a lot of repetition.

What the code does is:

1. Parse the JSON for each row.

2. Collect all unique category values into a Python list → stored in a new column called categories.

3. Collect all unique result values into a Python list → stored in a new column called results.

4. Drop the original last_analysis_results JSON column, since we’ve now summarized all its information in the two new columns.

Result: for each domain, you get a cleaned, deduplicated array of categories and results, instead of keeping a messy nested JSON.

It’s basically turning a “messy response from many engines” into two neat lists that are easy to work with

In [9]:
def get_categories_resume_from_last_analysis_results(json_data):
    data = json.loads(json_data)
    categories = {info["category"] for info in data.values()}
    categories = list(categories)
    return categories

def get_results_from_last_analysis_results(json_data):
    data = json.loads(json_data)
    results = {info["result"] for info in data.values()}
    results = list(results)
    return results


categories = get_categories_resume_from_last_analysis_results(df_final['last_analysis_results'][0])
results = get_results_from_last_analysis_results(df_final['last_analysis_results'][0])

In [10]:
df_final["categories"] = df_final.apply(lambda row: (get_categories_resume_from_last_analysis_results(row["last_analysis_results"])), axis=1)
df_final["results"] = df_final.apply(lambda row: (get_results_from_last_analysis_results(row["last_analysis_results"])), axis=1)

df_final = df_final.drop(['last_analysis_results'], axis=1)
print(df_final.columns)
df_final.head()

Index(['root_id', 'whois', 'reputation', 'tld', 'registrar',
       'last_modification_date', 'expiration_date', 'last_update_date',
       'creation_date', 'stats_malicious', 'stats_suspicious',
       'stats_undetected', 'stats_harmless', 'stats_timeout', 'votes_harmless',
       'votes_malicious', 'popularity_ranks', 'categories', 'tags',
       'threat_status', 'results'],
      dtype='object')


Unnamed: 0,root_id,whois,reputation,tld,registrar,last_modification_date,expiration_date,last_update_date,creation_date,stats_malicious,...,stats_undetected,stats_harmless,stats_timeout,votes_harmless,votes_malicious,popularity_ranks,categories,tags,threat_status,results
0,0.client-channel.google.com,Creation Date: 1997-09-15T04:00:00Z\nCreation ...,0,com,MarkMonitor Inc.,1754931941,1852517000.0,1722565000.0,874306800.0,0,...,30,64,0,0,0,"{""Cisco Umbrella"":{""rank"":992903,""timestamp"":1...","[undetected, harmless]",,whitelist,"[clean, unrated]"
1,17track.net,Creation Date: 2011-03-09T01:44:43Z\nDNSSEC: u...,0,net,Alibaba Cloud Computing Ltd. d/b/a HiChina (ww...,1756732487,1867711000.0,1746083000.0,1299635000.0,0,...,29,65,0,0,0,"{""Majestic"":{""rank"":9502,""timestamp"":175665108...","[undetected, harmless]",,whitelist,"[clean, unrated]"
2,1drv.com,Creation Date: 2013-08-05T18:33:50+0000\nCreat...,-58,com,MarkMonitor Inc.,1756734594,1785955000.0,1751628000.0,1375728000.0,0,...,29,65,0,0,4,"{""Statvoo"":{""rank"":20789,""timestamp"":168408348...","[undetected, harmless]",,whitelist,"[clean, unrated]"
3,1fichier.com,Admin City: REDACTED FOR PRIVACY\nAdmin Countr...,1,com,ONLINE SAS,1756735976,1765109000.0,1741239000.0,1260188000.0,1,...,29,64,0,1,0,"{""Majestic"":{""rank"":49174,""timestamp"":17566510...","[malicious, undetected, harmless]",,whitelist,"[clean, malicious, unrated]"
4,1und1.de,Changed: 2017-05-11T09:15:31+02:00\nDomain: 1u...,1,de,,1756733769,,,,0,...,29,65,0,1,0,"{""Majestic"":{""rank"":10222,""timestamp"":17566510...","[undetected, harmless]",,whitelist,"[clean, unrated]"


## popularity_ranks column

- popularity_ranks: <dictionary> domain's position in popularity ranks such as Alexa, Quantcast, Statvoo, etc. Every dictionary contains the following subfields:
    - rank: <integer> rank position.
    - timestamp: <integer> UTC timestamp when the rank was ingested.

In [11]:
import json
json_formatted_str = json.dumps(df_final['popularity_ranks'][0], indent=2)
json_formatted_str

'"{\\"Cisco Umbrella\\":{\\"rank\\":992903,\\"timestamp\\":1754923098}}"'

In [12]:
def get_popularity_ranks(json_ranks):
    data = json.loads(json_ranks)
    ranks = {row["rank"] for row in data.values()}
    return list(ranks)

get_popularity_ranks(df_final['popularity_ranks'][10])

[1313, 111457, 5000, 3214, 10174]

Here we’re working with the popularity_ranks column, which is a JSON string containing rankings of a domain from different sources (like Alexa, Cisco Umbrella, etc.).

- Each source gives its own rank, and there might be duplicates or multiple ranks across sources.

What this code does:

1. Parse the JSON for each row.

2. Extract all the rank values and collect them into a set to remove duplicates.

3. Convert the set to a list and save it in a new column called ranks.

4. Drop the original popularity_ranks JSON column, since all the useful info is now summarized in ranks.

Result: for each domain, you now have a clean array of ranks from all sources, easy to work with in analysis, instead of keeping the messy nested JSON.

In [13]:
df_final["ranks"] = df_final.apply(lambda row: (get_popularity_ranks(row["popularity_ranks"])), axis=1)

df_final = df_final.drop(['popularity_ranks'], axis=1)
print(df_final.columns)
df_final.head()

Index(['root_id', 'whois', 'reputation', 'tld', 'registrar',
       'last_modification_date', 'expiration_date', 'last_update_date',
       'creation_date', 'stats_malicious', 'stats_suspicious',
       'stats_undetected', 'stats_harmless', 'stats_timeout', 'votes_harmless',
       'votes_malicious', 'categories', 'tags', 'threat_status', 'results',
       'ranks'],
      dtype='object')


Unnamed: 0,root_id,whois,reputation,tld,registrar,last_modification_date,expiration_date,last_update_date,creation_date,stats_malicious,...,stats_undetected,stats_harmless,stats_timeout,votes_harmless,votes_malicious,categories,tags,threat_status,results,ranks
0,0.client-channel.google.com,Creation Date: 1997-09-15T04:00:00Z\nCreation ...,0,com,MarkMonitor Inc.,1754931941,1852517000.0,1722565000.0,874306800.0,0,...,30,64,0,0,0,"[undetected, harmless]",,whitelist,"[clean, unrated]",[992903]
1,17track.net,Creation Date: 2011-03-09T01:44:43Z\nDNSSEC: u...,0,net,Alibaba Cloud Computing Ltd. d/b/a HiChina (ww...,1756732487,1867711000.0,1746083000.0,1299635000.0,0,...,29,65,0,0,0,"[undetected, harmless]",,whitelist,"[clean, unrated]","[38944, 865, 10000, 1532, 9502]"
2,1drv.com,Creation Date: 2013-08-05T18:33:50+0000\nCreat...,-58,com,MarkMonitor Inc.,1756734594,1785955000.0,1751628000.0,1375728000.0,0,...,29,65,0,0,4,"[undetected, harmless]",,whitelist,"[clean, unrated]","[1000, 295850, 1873, 20789, 44693]"
3,1fichier.com,Admin City: REDACTED FOR PRIVACY\nAdmin Countr...,1,com,ONLINE SAS,1756735976,1765109000.0,1741239000.0,1260188000.0,1,...,29,64,0,1,0,"[malicious, undetected, harmless]",,whitelist,"[clean, malicious, unrated]","[20000, 647, 257267, 49174, 7513]"
4,1und1.de,Changed: 2017-05-11T09:15:31+02:00\nDomain: 1u...,1,de,,1756733769,,,,0,...,29,65,0,1,0,"[undetected, harmless]",,whitelist,"[clean, unrated]","[5000, 5963, 23995, 10222]"


# Save dataframe
Save silver dataset for VirusTotal

In [14]:
df_final.to_csv('silver/virus_total/virus_total.csv', sep=';', index=False)