In [None]:
import tweepy
from twitter_authentication import API_KEY, API_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET, BEARER_TOKEN
import json

# V1 AUTH
#auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
#auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
#tweepy.Client(bearer_token=BEARER_TOKEN, consumer_key=API_KEY, consumer_secret=API_SECRET, access_token=ACCESS_TOKEN, access_token_secret=ACCESS_TOKEN_SECRET)

client = tweepy.Client(bearer_token=BEARER_TOKEN)

In [None]:
url_list = []

def save_list(vehicle_type, url_list):
    jsonString = json.dumps(url_list)
    path = "./dataset/" + vehicle_type + "/img_urls.json"
    jsonFile = open(path, "w")
    jsonFile.write(jsonString)
    jsonFile.close()

def read_list(vehicle_type: str):
    path = "./dataset/" + vehicle_type + "/img_urls.json"
    fileObject = open(path, "r")
    jsonContent = fileObject.read()
    try:
        url_list = json.loads(jsonContent)
    except:
        #json is empty
        url_list = []
        pass
    return url_list

In [None]:
# Users of interest: UAWeapons, OSINTua, RALee85, praisethesteph, 200_zoka, oryxspioenkop, Arslon_Xudosi
# from:twitterdev
# query_t72 = 'T-72 has:images -is:retweet (lang:en OR lang:ru OR lang:uk)'
# query_mtlb = '(MT-LB OR MT-LBV OR MT-LBVM OR MT-LBVMK OR MT-LBVM/K) -is:retweet from:UAWeapons'

query_vehicle_list = [  # (model,query content)
    ('M113', 'M113'),
    ('MT-LB','MT-LB OR MT-LBV OR MT-LBVM OR MT-LBVMK OR MT-LBVM/K OR MT-LBu'),
    ('BTR-80','BTR-80 OR BTR-82'),
    ('BTR-82A','BTR-80A OR BTR-82A'),
    ('BMP-1','BMP-1'),
    ('BMP-2','BMP-2 OR BMP-2M'),
    ('BMP-3','BMP-3 OR BMP-3M'),
    ('T-62','T-62 OR T-62M OR T-62MV'),
    ('T-64','T-64 OR T-64A OR T-64B OR T-64BV OR T-64B1M OR T-64BM OR T-64BM2'),
    ('T-72','T-72 OR T-72A OR T-72AV OR T-72AMT OR T-72B OR T-72BA OR T-72B3 OR T-72B3 OR T-72M OR T-72M1'),
    ('T-80','T-80 OR T-80B OR T-80BV OR T-80U OR T-80BVM'),
    ('T-90','T-90 OR T-90A OR T-90M'),
    ('2S1','2S1 OR Gvozdika'),
    ('2S3','2S3 OR Akatsiya'),
    ('2S19','2S19 OR 2S19M OR 2S19M1 OR 2S19M2 OR Msta OR Msta-S OR Msta-SM2'),
    ('BM-21','BM-21'),
]

In [None]:
#img_url.json has a list of lists. Each item has this structure: [url, status, media_type, source]

for vehicle in query_vehicle_list:
    model, query_content = vehicle
    
    url_counter = 0
    url_list = []
    url_list = read_list(model)
    
    # query = '(' + query_content + ') has:images -is:retweet'
    query = '(' + query_content + ') (russian OR ukrainian OR russia OR ukraine OR DNR OR DPR OR LNR OR LPR OR captured OR kherson OR kharkiv OR oblast OR donetsk OR severodonetsk OR luhansk OR lugansk OR Dnieper OR Dnipro OR izium OR izyum OR offensive OR attack OR repulsed) has:images -is:retweet'
    #print(query)

    paginator = tweepy.Paginator(client.search_recent_tweets, query=query, max_results=100, limit=1000, expansions=['attachments.media_keys'], media_fields=['url'])
    for page in paginator:
        #print(page.includes['media'])  
        for item in page.includes['media']:
            # [url, status, media_type, source]
            if item.url is not None:
                if 'pbs.twimg.com/media/' in item.url:
                    url_list.append([item.url,'unknown','image','twitter'])
                elif 'twitter.com/'  in item.url:
                    url_list.append([item.url,'unknown','video','twitter'])
                url_counter += 1

    save_list(model, url_list)
    print('Saved', url_counter, model)

In [None]:
#https://stackoverflow.com/questions/70854869/tweepy-problem-retrieving-username-informations-with-twitter-v2-api
#https://stackoverflow.com/questions/72016766/tweepy-only-lets-me-get-100-results-how-do-i-get-more-ive-read-about-paginati

In [None]:
# Function deletes all duplicate URLs by turning List into a Set.
from itertools import combinations

def remove_url_duplicates(vehicle_type):
    # First chunk deletes exact duplicates
    url_list = []
    url_list = read_list(vehicle_type)

    initial_amount = len(url_list)
    url_list = [list(item) for item in set(tuple(row) for row in url_list)] # Turns the list of lists into a set to rule out duplicates
    # url_list = list(set(url_list))
    final_amount = len(url_list)

    # Second chunk deletes cases where the URL is the same and keeps the item with status information
    # This happens usually when we have a URL retrieved from the Twitter API with status 'unknown', and the same one from Oryx's site
    url_only_list = [item[0] for item in url_list]
    discarded_urls = []

    if len(url_only_list) != len(set(url_only_list)):
        for item in combinations(url_list, 2):
            item1, item2 = item
            url1, status1, _, _ = item1
            url2, status2, _, _ = item2
            if url1 == url2:
                if status1 == 'unknown':
                    discarded_urls.append(item1)
                else:
                    discarded_urls.append(item2)
        
    url_list = [item for item in url_list if item not in discarded_urls]

    save_list(vehicle_type, url_list)
    print(vehicle_type, '-', (initial_amount-final_amount)+len(discarded_urls), 'duplicated URLs deleted.')

#remove_url_duplicates('M113')

In [None]:
# Function removes all non-twitter URLs from .json files and applies the new structure to the data. Done to include status, media type and source to each URL.
# [url, status, media_type, source]

def clear_url_list(vehicle_type):
    url_list = []
    url_list = read_list(vehicle_type)
    new_url_list = []

    for item in url_list:
        # [url, status, media_type, source]
        if isinstance(item, list):
            new_url_list.append(item)
        else:
            if item is not None:
                if 'pbs.twimg.com/media/' in item:  # if the media URL belongs to an image
                    new_item = [item, 'unknown', 'image', 'twitter']
                    new_url_list.append(new_item)
                elif 'twitter.com/' in item:        # if the media URL belongs to a video
                    new_item = [item, 'unknown', 'video', 'twitter']
                    new_url_list.append(new_item)
    save_list(vehicle_type, new_url_list)


for item in query_vehicle_list:
    vehicle_type, _ = item
    # clear_url_list(vehicle_type)

In [None]:
import csv
import requests
import os

# Function gets all of the links from the Oryx's .csv file that correspond to a vehicle of interest
def csv_lookup(vehicle_type):
    r = requests.get('https://raw.githubusercontent.com/scarnecchia/oryx_data/main/totals_by_system.csv')
    oryx_scrape_path = './dataset/oryx_scrape.csv'
    with open(oryx_scrape_path, 'wb') as f:
        f.write(r.content)

    counter = 0

    url_list = []
    url_list = read_list(vehicle_type)

    with open(oryx_scrape_path, newline='') as csvfile:
        csv_reader = csv.reader(csvfile, delimiter=',', quotechar='|')
        for row in csv_reader:
            # Can't do list unpacking for some reason, so I have to do it this way
            system = row[2]
            status = row[3]
            url = row[4]

            if vehicle_type in system:
                counter += 1
                # [url, status, media_type, source]
                if 'i.postimg.cc/' in url: 
                    url_list.append([url, status, 'image', 'oryx'])
                elif 'https://postimg.cc/' in url: 
                    url_list.append([url, status, 'website', 'oryx'])
                elif 'pbs.twimg.com/media/' in url:
                    url_list.append([url, status, 'image', 'twitter'])
                elif 'twitter.com/' in url:
                    url_list.append([url, status, 'video', 'twitter'])
                else:
                    url_list.append([url, status, 'unknown', 'unknown'])

    save_list(vehicle_type, url_list)
    print('Added', counter, vehicle_type, 'URLs to the list. Total number:', len(url_list))

for item in query_vehicle_list:
    vehicle_type, _ = item
    csv_lookup(vehicle_type)
    remove_url_duplicates(vehicle_type)

In [None]:
import os

# Function that counts the amount of .jpg inside a directory. Since the function download_img now assigns names using the very same URL, this function will not be used.
def count_img(vehicle_type):
    counter = 0
    folder_path = './dataset/' + vehicle_type
    for path in os.scandir(folder_path):
        if path.is_file():
            if path.name[-4:] == '.jpg':
                counter += 1
    print(counter)

In [None]:
import requests
import os

# Function downloads images from URLs and assigns them the original name from the same URL. It's done this way in order to prevent downloading the same image twice.
def download_img(url, vehicle_type):
    if url is not None:
        # url received = [url, status, media_type, source]
        url, status, media_type, source = url
        if media_type == 'video':   
            return 'Video'    
        elif media_type == 'image':   # if 'https://twitter.com/' not in url:
            file_name_temp = url.split('/')
            if source == 'twitter':
                file_name = file_name_temp[-1]
            else: # If source is oryx then we concat the two parts of the URL, since the last one is not unique. 
                file_name = file_name_temp[-2] + file_name_temp[-1]
            full_path = os.path.join('./dataset/',vehicle_type,status,source,file_name) 
            if not os.path.exists(full_path):
                os.makedirs(os.path.dirname(full_path), exist_ok=True) # Creates folder if it doesn't exists previously
                r = requests.get(url)  
                with open(full_path, 'wb') as f:
                    f.write(r.content)
                return 'Downloaded'
            else:
                # Picture already exists
                return 'Exists'
        elif media_type == 'website':
            pass
        else: # media_type == 'unknown'
            pass
    else:
        pass

for item in query_vehicle_list:
    counter_downloaded = 0
    counter_pic_exists = 0
    counter_tw_videos = 0

    vehicle_type, _ = item
    remove_url_duplicates(vehicle_type)
    url_list = read_list(vehicle_type)
    print('+ Out of', len(url_list), vehicle_type, 'URLs:')
    for url in url_list:
        outcome = download_img(url, vehicle_type)
        if outcome == 'Downloaded': # Downloaded successfully
            counter_downloaded += 1
        elif outcome == 'Exists':   # Picture already exists
            counter_pic_exists += 1
        else: # outcome == 'Video': - Link belongs to a Twitter video
            counter_tw_videos += 1

    print('   -', counter_downloaded, 'new pictures were downloaded')
    print('   -', counter_pic_exists, 'pictures already existed')
    print('   -', counter_tw_videos, 'URLs belonged to Twitter videos')

#download_img('https://twitter.com/UAWeapons/status/1564984095591088129','2S3')

In [None]:
from PIL import Image
from pathlib import Path
import imagehash
import os
import numpy as np
IMAGE_EXTENSIONS = ['.jpg','.jpeg','.bmp','.png', '.gif', '.tiff']

def find_duplicates(folder_path,delete_duplicates,verbose=False):
        hash_size = 8
        
        fnames = os.listdir(folder_path)
        hashes = {}
        duplicates = []

        for image in fnames:
            if any(x in image for x in IMAGE_EXTENSIONS):
                if os.path.getsize(os.path.join(folder_path,image)) > 0:
                    with Image.open(os.path.join(folder_path,image)) as img:
                        temp_hash = imagehash.average_hash(img, hash_size)
                        if temp_hash in hashes:
                            # print('Duplicate {} \nfound for Image {}!\n'.format(image,hashes[temp_hash]))
                            if image not in duplicates:
                                duplicates.append(image)
                        else:
                            hashes[temp_hash] = image
        
        vehicle_name = folder_path.split('/')[-1]
        if verbose:
            print('\t',vehicle_name,'- Duplicates:')

        if len(duplicates) != 0:
            if delete_duplicates:
                for img in duplicates:
                    os.remove(os.path.join(folder_path,img))
                if verbose:
                    print('\t\t',len(duplicates),'deleted')
            else:
                if verbose:
                    for img in duplicates:
                        print('\t\t',img)
        else:
            if verbose:
                print('\t\tNo duplicates found') 

        return len(duplicates) or 0

def find_similar(image_path,folder_path,delete_duplicates,similarity,verbose=False):
    hash_size = 8

    fnames = os.listdir(folder_path)
    threshold = 1 - similarity/100
    diff_limit = int(threshold*(hash_size**2))

    duplicates = []
    image_name = ''
    
    if os.path.getsize(image_path) > 0:
        with Image.open(image_path) as img:
            hash1 = imagehash.average_hash(img, hash_size).hash
    
    for image in fnames:
        if any(x in image for x in IMAGE_EXTENSIONS):
            if os.path.getsize(os.path.join(folder_path,image)) > 0:
                with Image.open(os.path.join(folder_path,image)) as img:
                    hash2 = imagehash.average_hash(img, hash_size).hash
                    if np.count_nonzero(hash1 != hash2) <= diff_limit:
                        image_name_temp = image_path.split('/')
                        image_name = image_name_temp[-1]
                        if image_name != image: # Makes sure the original picture is not added to the list of duplicates
                            duplicates.append(image)
                            # print('{} image found {}% similar to {}'.format(image,similarity,folder_path))
    
    if verbose:
        print('\tOriginal:')
        print('\t\t',image_name)
        print('\tDuplicates:')

    if len(duplicates) != 0:
        if delete_duplicates:
            for img in duplicates:
                os.remove(os.path.join(folder_path,img))
            if verbose:
                print('\t\t',len(duplicates))
        else:
            if verbose:
                for img in duplicates:
                    print('\t\t',img)
    else:
        if verbose:
            print('\t\tNo close duplicates found') 
    
    return len(duplicates) or 0
    
#find_similar('./dd/2S1 - Copy/R0r6NcvQ5577.png','./dd/2S1 - Copy/',False,97,True)

In [None]:
#find_duplicates('./dd/orig1/',True)

#for item in query_vehicle_list:

folder_path = './dd/c3/'
fnames = os.listdir(folder_path)
for image in fnames:
    image_path = os.path.join(folder_path,image)
    if Path(image_path).is_file(): # Checks if the file exists, since it could've been deleted inside the function
        find_similar(image_path,folder_path,True,97)

In [None]:
for item in query_vehicle_list:
    vehicle_type, _ = item
    folder_path = './dataset/' + vehicle_type
    amount_duplicates = find_duplicates(folder_path, True)
    
    total_close_duplicates = 0
    folder_path += '/'
    fnames = os.listdir(folder_path)
    for image in fnames:
        if any(x in image for x in IMAGE_EXTENSIONS):
            image_path = os.path.join(folder_path,image)
            if Path(image_path).is_file(): # Checks if the file exists, since it could've been deleted inside the function
                amount_close_duplicates = find_similar(image_path,folder_path,True,97)
                total_close_duplicates += amount_close_duplicates
    print(vehicle_type, '-', amount_duplicates, 'duplicates and', total_close_duplicates, 'close duplicates deleted.')

