# Process links in video descriptions

This notebook processes links in video descriptions. The goal is to extract links that may be related to sponsors.

In [1]:
import re
import requests
from cachetools import cached, TTLCache
from pyspark.sql.functions import col, udf, explode

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Setup a cache for the requests
cache = TTLCache(maxsize=1024, ttl=86400)

In [2]:
USE_SUBDATASETS = False

PATH_METADATAS_SRC = '../data/subdata/yt_metadata_en_sub' if USE_SUBDATASETS else '../data/yt_metadata_en.jsonl.gz'

## Extract links from video descriptions

Firstly, we extract links using regular expressions. We perform a first filtering to remove urls from sites that are generally not related to sponsors such as Youtube, Twitter, Facebook, Wikipedia, Discord, etc...

In [3]:
PATH_METADATAS_URLS_DST = '../data/generated/yt_metadata_en_urls.parquet'
PATH_METADATAS_NO_URLS_DST = '../data/generated/yt_metadata_en_no_urls.parquet'

In [5]:
# Load the invalid URLs
PATH_INVALID_URLS = '../data/invalid_urls.csv'

invalid_urls_reg = []
with open(PATH_INVALID_URLS, 'r') as f:
    for line in f:
        invalid_urls_reg.append(fr"(?i)({line.strip()})")

In [6]:
def get_urls(description):
    url_regex = r"(https?:\/\/)?(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)"
    urls = []
    for line in description.split("\n"):
        if re.search(url_regex, line):
            urlss = [x.group() for x in re.finditer(url_regex, line)] # Find all urls in the line
            urlss = [x for x in urlss if not any([re.search(reg, x) for reg in invalid_urls_reg])] # Filter out invalid urls
            urls.extend(urlss)
    urls = set(urls) # Remove duplicates
    return urls

# Test the function on basic examples
assert(not get_urls('There is no link.'))
assert(get_urls('This is a link: www.special.com/') == {'www.special.com/'})
assert(get_urls('This is a link: https://www.special.com/ and this is another link: https://www.youtube.com/watch?v=2') == {'https://www.special.com/'})
assert(get_urls('This is a link: https://www.special.com/ \n and this is another link: www.special.com/') == {'www.special.com/', 'https://www.special.com/'})

### Write sponsored videos

In [7]:
metadatas = spark.read.json(PATH_METADATAS_SRC)

# Get urls and count of urls in the description
get_urls_udf = udf(lambda x: get_urls(x))
metadatas = metadatas.withColumn('urls', get_urls_udf(col('description')))
len_udf = udf(lambda x: len(x))
metadatas = metadatas.withColumn('urls_count', len_udf(col('urls')))

# Drop unneeded columns and take only the videos with at least one url
metadatas_urls = metadatas.drop('description').filter(metadatas.urls_count > 0)

metadatas_urls.write.parquet(PATH_METADATAS_URLS_DST, mode="overwrite")

Py4JJavaError: An error occurred while calling o66.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (host.docker.internal executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:307)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:245)
	... 42 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:655)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:420)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:826)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:295)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:307)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)


In [23]:
metadatas_urls.select('title', 'urls', 'urls_count').show(5)

+--------------------+--------------------+----------+
|               title|                urls|urls_count|
+--------------------+--------------------+----------+
|Gravity Falls The...|[http://www.zolta...|         8|
|DIY | VALENTINES ...|[iamDonareen@Gmai...|         1|
|Snowboarder hit b...|[http://bit.ly/Ne...|         7|
|Shah Rukh Khan Co...|[http://www.bolly...|         2|
|How Well Do You K...|[http://www.bolly...|         2|
+--------------------+--------------------+----------+
only showing top 5 rows



### Write non-sponsored videos

In [25]:
metadatas = spark.read.json(PATH_METADATAS_SRC)

# Get urls and count of urls in the description
get_urls_udf = udf(lambda x: get_urls(x))
metadatas = metadatas.withColumn('urls', get_urls_udf(col('description')))
len_udf = udf(lambda x: len(x))
metadatas = metadatas.withColumn('urls_count', len_udf(col('urls')))

# Drop unneeded columns and take only the videos with at least one url
metadatas_no_urls = metadatas.drop('description').filter(metadatas.urls_count == 0)

metadatas_no_urls.write.parquet(PATH_METADATAS_NO_URLS_DST, mode="overwrite")

In [26]:
metadatas_no_urls.select('title', 'urls', 'urls_count').show(5)

+--------------------+----+----------+
|               title|urls|urls_count|
+--------------------+----+----------+
|LOLO #finaLDance ...|  []|         0|
|Fresh Revelations...|  []|         0|
|Uncut - Grand Sta...|  []|         0|
|Boardwalk Bar Cra...|  []|         0|
|SKUNK ANANSIE "Be...|  []|         0|
+--------------------+----+----------+
only showing top 5 rows



## Resolve Bitly links (TODO)

We resolve Bitly links to get the original url. This is done using the Bitly API.

In [27]:
BITLY_API_KEY = '6eb2a9c9ec5950c276bf91b89ef2b1f229408807'

PATH_METADATAS_BITLY_DST = '../data/generated/yt_metadata_en_bitly.parquet'

In [None]:
# @cached(cache)
# def resolve_app_adjust_url(url, debug_mode=False):
#     if 'app.adjust.com' not in url:
#         return url
    
#     if debug_mode: print(f'Resolving app adjust url: {url}')
#     try:
#         response = requests.get(url)
        
#         if response.status_code == 200:
#             soup = bs4.BeautifulSoup(response.text, 'html.parser')
#             new_url = soup.find('a', class_='product-header__title app-header__title')
#             if debug_mode: print(f'Resolved adjust.com url: {new_url}')
#             return new_url if new_url is not None else url
#         else:
#             if debug_mode: print(f'Could not resolve adjust.com url: {url}')
#             return url

#     except Exception as e:
#         print(f'Could not resolve url {url}: {e}')
#         return url

In [28]:
@cached(cache)
def resolve_bitly_url(url, debug_mode=False):
    if 'bit.ly' not in url:
        return url

    # Remove http:// or https:// from the url
    url = url.replace('http://', '').replace('https://', '')

    if debug_mode: print(f'Resolving bit.ly url {url}...')
    try:
        # Post a request via the bit.ly API
        response = requests.post('https://api-ssl.bitly.com/v4/expand', headers={'Authorization': f'Bearer {BITLY_API_KEY}'}, json={'bitlink_id': url})

        # Retrieve the long url from the response
        if response.status_code == 200:
            new_url = response.json()['long_url']
            if debug_mode: print(f'\tResolved bit.ly url: {url} -> {new_url}')

            #if re.search(r"app.adjust.com", new_url):
            #    return resolve_app_adjust_url(new_url)
                    
            return new_url
        else:
            if debug_mode: print(f'\tCould not resolve bit.ly url: {url}, status code: {response.status_code}')
            return url

    except Exception as e:
        print(f'Could not resolve url {url}: {e}')
        return url

In [32]:
metadatas_urls = spark.read.parquet(PATH_METADATAS_URLS_DST)
resolve_bitly_url_udf = udf(lambda x: resolve_bitly_url(x))
metadatas_bitly = metadatas_urls.withColumn('urls', explode(col('urls')))
metadatas_bitly.write.parquet(PATH_METADATAS_BITLY_DST, mode="overwrite")

AnalysisException: cannot resolve 'explode(urls)' due to data type mismatch: input to function explode should be array or map type, not string;
'Project [categories#962, channel_id#963, crawl_date#964, dislike_count#965, display_id#966, duration#967L, like_count#968, tags#969, title#970, upload_date#971, view_count#972, explode(urls#973) AS urls#988, urls_count#974]
+- Relation [categories#962,channel_id#963,crawl_date#964,dislike_count#965,display_id#966,duration#967L,like_count#968,tags#969,title#970,upload_date#971,view_count#972,urls#973,urls_count#974] parquet
