In [None]:
from datetime import datetime

import pandas
from dask.diagnostics import ProgressBar
from tqdm import tqdm

from common.schemas.pyarrow_schema import schema

from common.storage.azure_file_storage import AzureFileStorageAdapter

pbar = ProgressBar()
pbar.register()

tqdm.pandas()
tqdm.pandas(desc="global")

from tqdm.dask import TqdmCallback

cb = TqdmCallback(desc="global")
cb.register()

file_system = AzureFileStorageAdapter('data').get_file_storage()

In [None]:
%%time

subs = [
	"SFWRedheads",
	"sfwpetite",
	"SFWNextDoorGirls",
	"trippinthroughtime",
	"fatsquirrelhate",
	"itookapicture",
	"memes",
	"CityPorn",
	"EarthPorn",
	"spaceporn",
	"realasians",
	"KoreanHotties",
	"prettyasiangirls",
	"AsianOfficeLady",
	"mildlypenis",
	"AsianInvasion",
	"sexygirls",
	"PrettyGirls",
	"gentlemanboners",
	"hotofficegirls",
	"tightdresses",
	"DLAH"
]

In [None]:
%%time

extant_data = pandas.read_parquet("data/processed_raw_data.parquet", engine='pyarrow', filesystem=file_system, schema=schema)
extant_data.set_index('id', inplace=True)
display(extant_data)

In [None]:
curated_data = pandas.read_parquet("data/parquet/back.parquet", engine='pyarrow', filesystem=file_system, schema=schema)
display(curated_data.shape)
display(curated_data)

In [None]:
# TODO: Merge curated data with extant data and update extant data

In [None]:
%%time

import praw

reddit: praw.Reddit = praw.Reddit(client_id='5hVavL0PIRyM_1JSvqT6UQ', client_secret='BjD2kS3WNLnJc59RKY-JJUuc_Z9-JA', user_agent='script:%(bot_name)s:v%(bot_version)s (by /u/%(bot_author)s)')

In [None]:
from praw.models import ListingGenerator
import pandas as pd
import os

posts = []
for sub in tqdm(subs, total=len(subs), desc="Creating Temp Dir For Subs..."):
	temp_dir_path = f"temp/{sub}"
	if not os.path.exists(temp_dir_path):
		os.makedirs(temp_dir_path)
	existing_data = pd.read_parquet(temp_dir_path)
	os.makedirs(temp_dir_path, exist_ok=True)

In [None]:
def write_log_message(submission_id: str, subreddit: str, message: str, exception: Exception) -> str:
	date_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
	return f"{date_time}\t{subreddit}\t{submission_id}\t{message}\t{exception}\n"

In [None]:
with open('log.txt', 'a') as f:
	for time_filter in ['week', 'day']:
		for sub in tqdm(subs, desc=f"{time_filter}", total=len(subs)):
			temp_dir_path = f"temp/{sub}"
			try:
				subreddit_stream: ListingGenerator = reddit.subreddit(display_name=sub).top(limit=100, time_filter=time_filter)
				subreddit_stream = list(subreddit_stream)
				for submission in tqdm(subreddit_stream, total=len(subreddit_stream), desc=f"Posts - {sub} - {time_filter}"):
					if submission is None:
						continue
					else:
						if submission.id in extant_data.index.values:
							continue
					try:
						author_name = 'Unknown'
						subreddit_name = sub
						try:
							author_name = submission.author.name
						except Exception as e:
							author_name = 'Unknown'
							pass
						p = {
							'id': submission.id,
							'subreddit': subreddit_name,
							'author': author_name,
							'title': submission.title,
							'caption': '',
							'hash': '',
							'permalink': submission.permalink,
							'original_url': submission.url,
							'image_name': '',
							'path': '',
							'thumbnail_path': '',
							'exists': False,
							'curated': False,
							'Tags': ['']
						}
						pd.DataFrame([p]).to_parquet(f"{temp_dir_path}/{submission.id}.parquet")
					except Exception as e:
						log = write_log_message(submission.id, sub, "Error Writing Post", e)
						f.write(log)
						continue
			except Exception as e:
				log = write_log_message(submission.id, sub, f"Error Getting Posts For SubReddit", e)
				f.write(log)
				continue

In [None]:
%%time

temp_data = pandas.DataFrame()

for sub in subs:
	df = pandas.read_parquet(f"temp/{sub}", schema=schema, engine='pyarrow')
	temp_data = temp_data.append(df, ignore_index=True)

for i,r in temp_data.iterrows():
	temp_data.loc[i, 'image_name'] = r.id + ".jpg"
	temp_data.loc[i, 'path'] = ""
	temp_data.loc[i, 'hash'] = ""
	temp_data.loc[i, 'caption'] = ""
	temp_data.loc[i, 'model'] = ""
	temp_data.loc[i, 'accept'] = False
	temp_data.loc[i, 'tags'] = [""]

temp_data.set_index('id', inplace=True, drop=False)

display(temp_data.shape)

display(temp_data)

In [None]:
%%time

filtered = temp_data[~temp_data['id'].isin(extant_data.index.values)]

display(filtered.shape)

display(filtered)

In [None]:
%%time

sources = [
	{"name": "CityDiffusion", "data": ["CityPorn"]},
	{"name": "NatureDiffusion", "data": ["EarthPorn"]},
	{"name": "CosmicDiffusion", "data": ["spaceporn"]},
	{"name": "ITAPDiffusion", "data": ["itookapicture"]},
	{"name": "MemeDiffusion", "data": ["memes"]},
	{"name": "TTTDiffusion", "data": ["trippinthroughtime"]},
	{"name": "WallStreetDiffusion", "data": ["wallstreetbets"]},
	{"name": "SexyDiffusion", "data": [ "selfies", "Amicute", "amihot", "AmIhotAF", "HotGirlNextDoor" ]},
	{"name": "FatSquirrelDiffusion", "data": ["fatsquirrelhate"]},
	{"name": "CelebrityDiffusion", "data": ["celebrities"]},
	{"name": "OldLadyDiffusion", "data": ["oldladiesbakingpies"]},
	{"name": "SWFPetite", "data": ["sfwpetite"]},
	{"name": "SFWMilfs", "data": ["cougars_and_milfs_sfw"]},
	{"name": "RedHeadDiffusion", "data": ["SFWRedheads"]},
	{"name": "NextDoorGirlsDiffusion", "data": ["SFWNextDoorGirls"]},
	{"name": "SexyAsianDiffusion", "data": ["realasians", "KoreanHotties", "prettyasiangirls", "AsianOfficeLady", "AsianInvasion"]},
	{"name": "MildlyPenisDiffusion", "data": ["mildlypenis"]},
	{"name": "PrettyGirlDiffusion", "data": ["sexygirls", "PrettyGirls", "gentlemanboners", "hotofficegirls", "tightdresses", "DLAH"]}
]
sources_df = pd.DataFrame.from_records(sources)

def add_source(x: object, source_list) -> str:
	for source in source_list:
		if x['subreddit'] in source['data']:
			return source['name']
	return ""

In [None]:
%%time

filtered['model'] = filtered.progress_apply(lambda x: add_source(x, sources), axis=1)

display(filtered.shape)

display(filtered)

In [None]:
from io import BytesIO
import requests
import hashlib
from PIL import Image


def get_hash_from_path(in_path: str) -> hash:
	if os.path.exists(in_path):
		with open(in_path, 'rb') as f_:
			content = f_.read()
			result = hashlib.md5(content).hexdigest()
			return result, content
	else:
		return ""



def fetch_image(x: object, file_list_) -> object:
	with open('log.txt', 'a') as f_image:
		try:
			url = x['original_url']
			subreddit = x['subreddit']
			image_id = x['id']
			os.makedirs(f"temp\\image\\{subreddit}", exist_ok=True)
			temp_path = f"temp\\image\\{subreddit}\\{image_id}.jpg"
			out_path = f"data/image/{image_id}.jpg"
			if os.path.exists(temp_path):
				md5, content = get_hash_from_path(temp_path)
				if md5 != "f17b01901c752c1bb04928131d1661af" or md5 != "d835884373f4d6c8f24742ceabe74946":
					file_system.upload(temp_path, out_path)
					return out_path
				else:
					return ""
			else:
				response = requests.get(url)
				md5 = hashlib.md5(response.content).hexdigest()
				if md5 != "f17b01901c752c1bb04928131d1661af" or md5 != "d835884373f4d6c8f24742ceabe74946":
					try:
						raw_image = Image.open(BytesIO(response.content))
						raw_image.save(temp_path)
						raw_image.close()
						if out_path in file_list_:
							return out_path
						else:
							file_system.upload(temp_path, out_path)
							return out_path
					except Exception as ex:
						message = write_log_message(x['id'], x['subreddit'], "Failure in fetch_image", ex)
						f_image.write(message)
						return ""
				else:
					return ""
		except Exception as ex:
			message = write_log_message(x['id'], x['subreddit'], "Failure in fetch_image", ex)
			f_image.write(message)
			return ""

In [None]:
%%time

file_list = file_system.ls("data/image")
filtered['path'] = filtered.progress_apply(lambda x: fetch_image(x, file_list), axis=1)

display(filtered.shape)

display(filtered)

In [None]:
def set_exists(x: object):
	sub_reddit = x['subreddit']
	record_id = x['id']
	temp_path = f"temp\\image\\{sub_reddit}\\{record_id}.jpg"
	return os.path.exists(temp_path)

In [None]:
%%time

filtered['exists'] = filtered.progress_apply(lambda x: set_exists(x), axis=1)

display(filtered.shape)
display(filtered)

In [None]:
def set_hash(x: object):
	sub_reddit = x['subreddit']
	record_id = x['id']
	temp_path = f"temp\\image\\{sub_reddit}\\{record_id}.jpg"
	if os.path.exists(temp_path):
		return hashlib.md5(open(temp_path, 'rb').read()).hexdigest()
	else:
		return ""

In [None]:
filtered['hash'] = filtered.progress_apply(lambda x: set_hash(x), axis=1)
display(filtered.shape)
display(filtered)

In [None]:
from common.captioning.caption import BlipCaption
import random

def apply_caption(x: object, caption_routine: [BlipCaption, BlipCaption]) -> str:
	with open('log.txt', 'a') as f_3:
		exists = x['exists']
		if not exists:
			return ""
		sub_reddit = x['subreddit']
		record_id = x['id']
		temp_path = f"temp\\image\\{sub_reddit}\\{record_id}.jpg"

		if os.path.exists(temp_path):
			try:
				result = random.choice(caption_routine).caption_image(temp_path)
				return result
			except Exception as ex:
				message = write_log_message(x['id'], x['subreddit'], "Failure in apply_caption", ex)
				f_3.write(message)
				return ""
		else:
			return ""

In [None]:
%%time

caption_0 = BlipCaption("0")
caption_1 = BlipCaption("1")

In [None]:
%%time

filtered['caption'] = filtered.progress_apply(lambda x: apply_caption(x, [caption_0, caption_1]), axis=1)

display(filtered.shape)
display(filtered)

In [None]:
%%time

filtered_more = filtered.loc[filtered['caption'] != ""]
display(filtered_more.shape)
display(filtered_more)

In [None]:
%%time

dropped = filtered_more.dropna(axis=1, how='all')
dropped.reset_index(drop=True, inplace=True)
display(dropped)

In [None]:
%%time

dropped.to_parquet("filtered.parquet", schema=schema, compression='gzip')

In [None]:
%%time

bar = pd.read_parquet("filtered.parquet")
display(bar)

In [None]:
%%time

concat = pd.concat([f, dropped])
display(concat.shape)
display(concat)

In [None]:
def fix_path(x:object, fl: []) -> str:
	current_path = x['path']
	exists = x['exists']
	if current_path in fl:
		return current_path
	else:
		image_id = x['id']
		if exists:
			return f"data/image/{image_id}.jpg"
		else:
			return ""

In [None]:
%%time

file_list_ = file_system.ls("data/image")
concat['path'] = concat.progress_apply(lambda x: fix_path(x, file_list_), axis=1)
display(concat)

In [None]:
# %%time

# concat.to_parquet("data/parquet/back.parquet", schema=schema, compression='gzip', filesystem=file_system)
# new = pd.read_parquet("data/parquet/back.parquet", engine='pyarrow', schema=schema, filesystem=file_system)

In [None]:
# %%time

# display(new.shape)
# display(new)

In [None]:
# %%time
#
# display("== Clean Up ==")
# os.rmdir("temp")