Big Data Analytics Framework for Efficient Management & Preservation of Digital Archives & Libraries

**Libraries**

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
!pip install opencv-python --index-url=https://pypi.python.org/simple/

In [None]:
!pip install moviepy 

In [None]:
!pip install SpeechRecognition

In [None]:
!pip install pydub

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import shutil
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

**Data Sorting**

In [None]:
def categorize_and_move_files(directory_path):
    # Define folder names
    video_folder = 'videos'
    pdf_folder = 'pdfs'
    images_folder = 'images'
    unclassified_folder = 'unclassified'

    # Create folders if they don't exist
    folders = [video_folder, pdf_folder, images_folder, unclassified_folder]
    for folder in folders:
        folder_path = os.path.join(directory_path, folder)
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)

    # Iterate through all files in the directory
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)

        # Check if it's a file
        if os.path.isfile(file_path):
            # Get the file extension
            _, file_extension = os.path.splitext(filename)

            # Categorize files based on extension
            if file_extension.lower() in ['.mp4', '.avi', '.mkv', '.dat']:
                destination_folder = video_folder
            elif file_extension.lower() == '.pdf':
                destination_folder = pdf_folder
            elif file_extension.lower() in ['.jpg', '.jpeg', '.png', '.gif']:
                destination_folder = images_folder
            else:
                destination_folder = unclassified_folder

            # Move the file to the corresponding folder
            destination_path = os.path.join(directory_path, destination_folder, filename)
            shutil.move(file_path, destination_path)
            print(f"Moved {filename} to {destination_folder} folder.")

            
directory_path = './data'
categorize_and_move_files(directory_path)


**Video Data Reading/Analysis**

In [None]:
# Function to get video features
def get_video_info(video_path):
    cap = cv2.VideoCapture(video_path)
    
    # Get number of frames and resolution
    num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    resolution = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    # Calculate duration using number of frames and frame rate
    duration = num_frames / frame_rate if frame_rate > 0 else 0
    # Get codec information
    codec_fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
    codec = chr(codec_fourcc & 0xFF) + chr((codec_fourcc & 0xFF00) >> 8) + chr((codec_fourcc & 0xFF0000) >> 16) + chr((codec_fourcc & 0xFF000000) >> 24)


    cap.release()

    return num_frames, resolution, duration, frame_rate, codec

In [None]:
# Directory path
directory_path = './data/videos'

# Ensure the directory path is valid
if not os.path.isdir(directory_path):
    print("Invalid directory path.")
else:
    # Initialize an empty list to store file information
    file_info_list = []

    # Iterate through all files in the directory
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)

        # Check if it's a file
        if os.path.isfile(file_path):
            # Extract filename, extension, and filepath
            file_info = {
                'filename': os.path.splitext(filename)[0],  # Store filename without extension
                'extension': os.path.splitext(filename)[1],
                'filepath': os.path.normpath(file_path)  # Normalize file path
            }
            # Get video information
            num_frames, resolution, duration, frame_rate, codec = get_video_info(file_path)
            file_info['num_frames'] = num_frames
            file_info['resolution'] = resolution
            file_info['duration'] = duration
            file_info['frame_rate'] = frame_rate
            file_info['codec'] = codec

            
            
            file_info_list.append(file_info)

    # Create a DataFrame from the list of file information
    video_file_info_df = pd.DataFrame(file_info_list)


In [None]:
# Display the DataFrame
video_file_info_df.head(10)

**Data Extraction From Files**

In [None]:
from moviepy.editor import VideoFileClip
import speech_recognition as sr
import tempfile
from pydub import AudioSegment


# Function to extract audio from video file using moviepy
def extract_audio(video_file):
    video_clip = VideoFileClip(video_file)
    audio_clip = video_clip.audio
    return audio_clip

def chunk_audio_and_save(audio_path, chunk_length=5000):  # chunk_length in milliseconds
    audio = AudioSegment.from_wav(audio_path)
    length_audio = len(audio)
    chunk_paths = []
    for i, chunk in enumerate(range(0, length_audio, chunk_length)):
        chunk_audio = audio[chunk:chunk + chunk_length]
        chunk_path = f"./temp_chunk_{i}.wav"
        chunk_audio.export(chunk_path, format="wav")
        chunk_paths.append(chunk_path)
    return chunk_paths

# Function to convert audio to text using SpeechRecognition
def audio_to_text(filename, audio_file):
    audio_file_name = f'./audio/{filename}.wav'
    audio_file.write_audiofile(audio_file_name)
    chunk_file_paths = chunk_audio_and_save(audio_file_name)
    # Initialize recognizer 
    r = sr.Recognizer() 
    text = ""
    
    for i, file_path in enumerate(chunk_file_paths):
        print(f"Transcribing chunk {i+1}/{len(chunk_file_paths)}...")
        # Load the audio file 
        with sr.AudioFile(file_path) as source: 
            data = r.record(source) 

        # Convert speech to text 
        part_text = r.recognize_google(data)
        text += part_text
        os.remove(file_path)  # Clean up chunk file

    return text

# Iterate through each row in the DataFrame
for index, row in video_file_info_df.iterrows():
    # Check if it's a video file (.mp4, .dat)
    if row['extension'].lower() in ['.mp4', '.dat']:
        try:
            video_path = "./" + row['filepath'].replace('\\', '/')
            print(video_path)
            # Extract audio from video
            audio_clip = extract_audio(video_path)

            # Convert audio to text
            audio_text = audio_to_text(row['filename'], audio_clip)

            # Update 'audio_text' column in the DataFrame
            video_file_info_df.at[index, 'audio_text'] = audio_text
        except Exception as e:
            print(f"Error processing {row['filename']}: {e}")


In [None]:
# Display the updated DataFrame
video_file_info_df.head()

**Image Data Reading/Analysis**

In [None]:
!pip install pytesseract

In [None]:
import pytesseract
from PIL import Image

pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'


In [None]:
# Function to calculate image noise 
def calculate_noise(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    noise = cv2.meanStdDev(gray)[1][0]
    return noise

In [None]:
# Function to get dominant color
def get_dominant_color(image):
    # Reshape image to a list of pixels
    pixels = image.reshape((-1, 3))

    # Calculate histogram
    hist = np.histogramdd(pixels, bins=(256, 256, 256), range=[(0, 256), (0, 256), (0, 256)])[0]

    # Find the dominant color
    dominant_color = np.unravel_index(np.argmax(hist), hist.shape)

    return dominant_color

In [None]:
# Function to extract text using OCR
def extract_text(image_path):
    # Read the image using Pillow (PIL)
    img_pil = Image.open(image_path)

    # Perform OCR using Tesseract
    text = pytesseract.image_to_string(img_pil)

    return text

In [None]:
# Function to get image information
def get_image_info(image_path):
    img = cv2.imread(image_path)

    # Get basic image information
    filename = os.path.splitext(os.path.basename(image_path))[0]
    extension = os.path.splitext(image_path)[1]
    resolution = img.shape[:2]
    num_pixels = img.size
    is_grayscale = len(img.shape) < 3
    noise = calculate_noise(img)
    size = os.path.getsize(image_path)
    
    mean_intensity = img.mean()
    std_intensity = img.std()
    min_intensity = img.min()
    max_intensity = img.max()
    
    # Color channels
    num_channels = img.shape[2] if len(img.shape) == 3 else 1

    # Dominant color
    dominant_color = get_dominant_color(img)

    # Aspect ratio
    aspect_ratio = resolution[0] / resolution[1] if resolution[1] != 0 else 0
    
    # Extract text using OCR
    text = extract_text(image_path)

    return {
        'filename': filename,
        'extension': extension,
        'filepath': os.path.normpath(image_path),
        'resolution': resolution,
        'num_pixels': num_pixels,
        'is_grayscale': is_grayscale,
        'noise': noise,
        'size': size,
        'mean_intensity': mean_intensity,
        'std_intensity': std_intensity,
        'min_intensity': min_intensity,
        'max_intensity': max_intensity,
        'num_channels': num_channels,
        'dominant_color': dominant_color,
        'aspect_ratio': aspect_ratio,
        'text': text
    }

In [None]:
# Directory path
directory_path = './data/images'

# Ensure the directory path is valid
if not os.path.isdir(directory_path):
    print("Invalid directory path.")
else:
    # Initialize an empty list to store file information
    file_info_list = []

    # Iterate through all files in the directory
    for filename in os.listdir(directory_path):
        image_path = os.path.join(directory_path, filename)

        # Check if it's an image file
        if os.path.isfile(image_path) and any(image_path.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png', '.gif']):
            # Get image information
            image_info = get_image_info(image_path)
            file_info_list.append(image_info)

    # Create a DataFrame from the list of file information
    image_file_info_df = pd.DataFrame(file_info_list)


In [None]:
# Display the DataFrame
image_file_info_df.head()

In [None]:
image_file_info_df.info()

In [None]:
image_file_info_df.describe()

In [None]:
image_file_info_df['extension'].value_counts()

In [None]:
# Group by extension and calculate average size and number of pixels
group_by_extension = image_file_info_df.groupby('extension').agg({
    'size': 'mean',
    'num_pixels': 'mean'
}).reset_index()

group_by_extension

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix

X = image_file_info_df[['size', 'num_pixels', 'mean_intensity', 'num_channels']]
y = image_file_info_df['is_grayscale']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train a RandomForestClassifier
clf = RandomForestClassifier()
clf.fit(X_train, y_train)

# Make predictions and evaluate accuracy
predictions = clf.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
conf_matrix = confusion_matrix(y_test, predictions)

# Display metrics
print(f'Accuracy: {accuracy}')

print(f'Confusion Matrix:\n{conf_matrix}')


**Visualization**

In [None]:
import matplotlib.pyplot as plt

# Plot histogram for image size
image_file_info_df['size'].hist(bins=20)
plt.title('Histogram of Image Sizes')
plt.xlabel('Size (bytes)')
plt.ylabel('Frequency')
plt.show()


In [None]:
import seaborn as sns

# Box plot for image size
sns.boxplot(x=image_file_info_df['size'])
plt.title('Box Plot of Image Sizes')
plt.xlabel('Size (bytes)')
plt.show()


In [None]:
#correlation plot
correlation_matrix = image_file_info_df.corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', linewidths=.5)
plt.title('Correlation Matrix')
plt.show()

In [None]:
#pairplot
numerical_features = ['size', 'num_pixels', 'mean_intensity', 'num_channels']
sns.pairplot(image_file_info_df[numerical_features])
plt.suptitle('Pairplot of Numerical Features', y=1.02)
plt.show()


In [None]:
#class distribution
plt.figure(figsize=(8, 6))
sns.countplot(data=image_file_info_df, x='is_grayscale')
plt.title('Class Distribution (Grayscale vs. Color)')
plt.xlabel('Image Type')
plt.ylabel('Count')
plt.show()


In [None]:
#scatterplot 3D
from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

ax.scatter(image_file_info_df['num_pixels'], image_file_info_df['mean_intensity'], image_file_info_df['size'], c='blue', s=10)
ax.set_xlabel('Num Pixels')
ax.set_ylabel('Mean Intensity')
ax.set_zlabel('Size (bytes)')

plt.title('3D Scatter Plot of Numerical Features')
plt.show()


In [None]:
#box plot of image numerical features 
plt.figure(figsize=(14, 8))

for i, feature in enumerate(numerical_features, 1):
    plt.subplot(2, 2, i)
    sns.boxplot(data=image_file_info_df, x='is_grayscale', y=feature)
    plt.title(f'Box Plot of {feature} by Image Type')

plt.tight_layout()
plt.show()



In [None]:
#
plt.figure(figsize=(14, 8))

for i, feature in enumerate(numerical_features, 1):
    plt.subplot(2, 2, i)
    sns.violinplot(data=image_file_info_df, x='is_grayscale', y=feature)
    plt.title(f'Violin Plot of {feature} by Image Type')

plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12, 6))
sns.countplot(data=image_file_info_df, x='extension')
plt.title('Count of Images by Extension')
plt.xlabel('Image Extension')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.show()

In [None]:
from pandas.plotting import parallel_coordinates

plt.figure(figsize=(12, 8))
parallel_coordinates(image_file_info_df[numerical_features + ['is_grayscale']], 'is_grayscale', colormap='coolwarm')
plt.title('Parallel Coordinates Plot of Numerical Features')
plt.show()

In [None]:
from pandas.plotting import radviz
 
plt.figure(figsize=(12, 8))
radviz(image_file_info_df[numerical_features + ['is_grayscale']], 'is_grayscale', colormap='coolwarm')
plt.title('RadViz Plot of Numerical Features')
plt.show()

**Natural Language Processing - NLP**

In [None]:
image_file_info_df.head()

In [None]:
!pip install nltk

In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
import string

In [None]:
# Download NLTK resources
nltk.download('stopwords')
nltk.download('punkt')

# Function for text preprocessing
def preprocess_text(text):
    # Remove punctuation
    text = text.translate(str.maketrans('', '', string.punctuation))

    # Tokenize the text
    tokens = word_tokenize(text)

    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word.lower() not in stop_words]

    # Stemming
    stemmer = PorterStemmer()
    tokens = [stemmer.stem(word) for word in tokens]

    # Join tokens back to form processed text
    processed_text = ' '.join(tokens)

    return processed_text

# Apply preprocessing to the 'text' column
image_file_info_df['processed_text'] = image_file_info_df['text'].apply(preprocess_text)

In [None]:
# Display the DataFrame with the processed text column
image_file_info_df[['text', 'processed_text']].head(50)

In [None]:
from nltk import FreqDist

# Function to plot word frequency
def plot_word_frequency(text):
    tokens = word_tokenize(text)
    freq_dist = FreqDist(tokens)
    freq_dist.plot(20, cumulative=False)
    plt.title('Top 20 Most Frequent Words')
    plt.show()

# Apply word frequency analysis to a sample
plot_word_frequency(image_file_info_df['processed_text'].iloc[0])

In [None]:
from nltk.sentiment import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')

# Function for sentiment analysis
def perform_sentiment_analysis(text):
    analyzer = SentimentIntensityAnalyzer()
    sentiment_scores = analyzer.polarity_scores(text)
    return sentiment_scores

# Apply sentiment analysis to a sample
image_file_info_df['sentiment_scores'] = image_file_info_df['processed_text'].apply(perform_sentiment_analysis)
image_file_info_df[['processed_text', 'sentiment_scores']].head()


In [None]:
from nltk import pos_tag, ne_chunk
from nltk.tokenize import word_tokenize
nltk.download('maxent_ne_chunker')
nltk.download('words')
nltk.download('averaged_perceptron_tagger')

# Function for named entity recognition
def extract_named_entities(text):
    words = word_tokenize(text)
    tagged_words = pos_tag(words)
    named_entities = ne_chunk(tagged_words)
    return named_entities

# Apply named entity recognition to a sample
sample_text = image_file_info_df['processed_text'].iloc[0]
named_entities = extract_named_entities(sample_text)
print(named_entities)

**PDF Data Reading/Analysis**

In [None]:
!pip install pdf2image

In [None]:
!pip install PyMuPDF

In [None]:
import fitz  # PyMuPDF

In [None]:
# Function to extract text content from PDF
def get_text_content(doc):
    text_content = ''
    for page_num in range(doc.page_count):
        page = doc[page_num]
        text_content += page.get_text()
    return text_content

# Function to get the number of images in a PDF
def get_num_images(doc):
    num_images = 0
    for page_num in range(doc.page_count):
        page = doc[page_num]
        num_images += len(page.get_images(full=True))
    return num_images

In [None]:
# Function to get PDF information
def get_pdf_info(pdf_path):
    doc = fitz.open(pdf_path)

    # Get basic PDF information
    filename = os.path.splitext(os.path.basename(pdf_path))[0]
    extension = os.path.splitext(pdf_path)[1]
    num_pages = doc.page_count
    size = os.path.getsize(pdf_path)

    # Additional PDF-related information
    text_content = get_text_content(doc)
    num_words = len(text_content.split())
    num_images = get_num_images(doc)


    doc.close()

    return {
        'filename': filename,
        'extension': extension,
        'filepath': os.path.normpath(pdf_path),
        'num_pages': num_pages,
        'size': size,
        'text_content': text_content,
        'num_words': num_words,
        'num_images': num_images
    }



In [None]:

# Directory path
directory_path = './data/pdfs'

# Ensure the directory path is valid
if not os.path.isdir(directory_path):
    print("Invalid directory path.")
else:
    # Initialize an empty list to store file information
    file_info_list = []

    # Iterate through all files in the directory
    for filename in os.listdir(directory_path):
        pdf_path = os.path.join(directory_path, filename)

        # Check if it's a PDF file
        if os.path.isfile(pdf_path) and pdf_path.lower().endswith('.pdf'):
            # Get PDF information
            pdf_info = get_pdf_info(pdf_path)
            file_info_list.append(pdf_info)

    # Create a DataFrame from the list of file information
    pdf_file_info_df = pd.DataFrame(file_info_list)


In [None]:
# Display the DataFrame
pdf_file_info_df.head()

**Saving DataFrames In Excel**

In [None]:
excel_file_path = 'dataframes.xlsx'

# Create an Excel writer object
with pd.ExcelWriter(excel_file_path, engine='xlsxwriter') as writer:
    # Write each DataFrame to a different sheet
    pdf_file_info_df.to_excel(writer, sheet_name='PDF_Info', index=False)
    image_file_info_df.to_excel(writer, sheet_name='Image_Info', index=False)
    video_file_info_df.to_excel(writer, sheet_name='Video_Info', index=False)

print(f'Excel file "{excel_file_path}" has been created with three sheets.')


In [None]:
# Generate a report summarizing the findings
def generate_report(dataframes):
    for df in dataframes:
        # Create a simple text-based report
        report = f"{df} Data Analysis Report\n\n"
        report += "Data Columns Info:\n"
        report += str(df.info()) + "\n\n"
        report += "Summary Statistics:\n"
        report += str(df.describe()) + "\n\n"
        report += "Correlation Matrix:\n"
        report += str(df.corr) + "\n"

        # Save the report to a text file
        with open('data_analysis_report.txt', 'w') as file:
            file.write(report)

        print("Data analysis report generated and saved.")
    else:
        print("Analysis results are None. Cannot generate the report.")

all_dataframes = [image_file_info_df,video_file_info_df,pdf_file_info_df]
generate_report(all_dataframes)